#python #grpc #http2 #grpc-python
#python #grpc #http2 #grpc-python
Вопрос:
У меня есть сервер gRPC со следующим прототипом:
syntax = "proto3";
service MyServicer {
rpc DoSomething(stream InputBigData) returns (stream OutputBigData) {}
}
message InputBigData {
bytes data = 1;
}
message OutputBigData {
bytes data = 1;
}
И мой сервер создается с помощью следующего кода Python:
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10),
options=[('grpc.max_receive_message_length', -1),
('grpc.max_send_message_length', -1))])
max_receive_message_length и max_send_message_length имеют значение -1, чтобы разрешить передачу больших сообщений (обычно 8 МБ). Клиент также определяет те же параметры.
Пример 1: Рассмотрим, что клиент отправляет на сервер InputBigData с более высокой скоростью, чем может позволить сервер. Как я могу настроить, сколько входных данных (или байтов) может быть помещено в очередь во входном потоке?
Пример 2: Предположим, что клиент считывает ответ OutputBigData с сервера с более низкой скоростью, чем может позволить себе клиент. Как я могу настроить, сколько OutputBigData (или байтов) может быть помещено в очередь в выходном потоке?
Я знаю, что управление потоком gRPC основано на HTTP / 2: https://httpwg.org/specs/rfc7540.html#FlowControl Я попытался установить grpc.http2.write_buffer_size на 67108864 (кажется, это максимальное значение), но ничего не произошло.
Вот реализация, которая освещает случай 2:
# server.py
from concurrent import futures
import grpc
import myservicer_pb2_grpc, myservicer_pb2
class MyServicer(myservicer_pb2_grpc.MyServicer):
def DoSomething(self, request_iterator, target, **kwargs):
big_data = b'0' * 1920*1080*4
for r in request_iterator:
print("server received input big data")
yield myservicer_pb2.OutputBigData(data=big_data)
print("server sent output big data")
if __name__ == '__main__':
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10),
options=[('grpc.max_receive_message_length', -1),
('grpc.max_send_message_length', -1)])
myservicer_pb2_grpc.add_MyServicerServicer_to_server(
MyServicer(), server)
server.add_insecure_port("[::]:50051")
server.start()
server.wait_for_termination()
# client.py
import time
import grpc
import myservicer_pb2_grpc
import myservicer_pb2
def big_data_generator():
big_data = b'0' * 1920*1080*4
for i in range(100):
yield myservicer_pb2.InputBigData(data=big_data)
def run():
with grpc.insecure_channel('localhost:50051',
options=[('grpc.max_send_message_length', -1),
('grpc.max_receive_message_length', -1)]) as channel:
stub = myservicer_pb2_grpc.MyServicerStub(channel)
res = stub.DoSomething(big_data_generator())
for r in res:
print("Client received data")
time.sleep(10)
if __name__ == '__main__':
run()
Через 10 секунд мой сервер выводит:
server received input big data
server sent output big data
server received input big data
server sent output big data
server received input big data
И мой вывод клиента:
Client received data
Мой сервер получил 3 входных данных и отправил 2 выходных данных. Теперь он заблокирован до тех пор, пока клиент не получит выходные данные. В этом сценарии я хочу увеличить (в 2 или 3 раза) размер выходного буфера, чтобы он мог продолжать обрабатывать больше входных данных, даже если клиент запаздывает с получением результата.
Ответ №1:
Спасибо за подробный вопрос. Я попробовал ваш пример, но все еще не могу настроить gRPC на свободное увеличение размера его окна.
Аргументы канала gRPC можно найти здесь . Реализация управления потоком здесь, есть только несколько, которые могут повлиять на управление потоком, а именно:
grpc.http2.bdp_probe=0
: отключает автоматическое увеличение окнаgrpc.http2.max_frame_size
: Максимальный размер кадра HTTP / 2grpc.http2.write_buffer_size
: На самом деле это не опция управления потоком, она используется для GRPC_WRITE_BUFFER_HINT (запись без блокировки). Кроме того, GRPC_WRITE_BUFFER_HINT еще не поддерживается в gRPC Python
Нет аргумента, который мог бы вызвать обновление размера окна. Размер окна по умолчанию составляет 64 КБ. gRPC увеличит размер окна с помощью оценки BDP. Например, на моем ноутбуке размер окна, исходящего от клиента, увеличился до 8380679 (~ 8 МБ). Но мне еще предстоит найти способ вручную вмешаться в этот процесс.
Итак, к сожалению, вам может потребоваться буферизация на уровне приложения. Вы можете использовать сопрограммы в asyncio или потоковую обработку с потокобезопасной очередью как на стороне клиента, так и на стороне сервера.