Как настроить управление потоком gRPC HTTP / 2 в python

#python #grpc #http2 #grpc-python

#python #grpc #http2 #grpc-python

Вопрос:

У меня есть сервер gRPC со следующим прототипом:

 syntax = "proto3";

service MyServicer {
  rpc DoSomething(stream InputBigData) returns (stream OutputBigData) {}
}
message InputBigData {
    bytes data = 1;
}
message OutputBigData {
    bytes data = 1;
}
  

И мой сервер создается с помощью следующего кода Python:

 server = grpc.server(futures.ThreadPoolExecutor(max_workers=10),
                     options=[('grpc.max_receive_message_length', -1),
                              ('grpc.max_send_message_length', -1))])
  

max_receive_message_length и max_send_message_length имеют значение -1, чтобы разрешить передачу больших сообщений (обычно 8 МБ). Клиент также определяет те же параметры.

Пример 1: Рассмотрим, что клиент отправляет на сервер InputBigData с более высокой скоростью, чем может позволить сервер. Как я могу настроить, сколько входных данных (или байтов) может быть помещено в очередь во входном потоке?

Пример 2: Предположим, что клиент считывает ответ OutputBigData с сервера с более низкой скоростью, чем может позволить себе клиент. Как я могу настроить, сколько OutputBigData (или байтов) может быть помещено в очередь в выходном потоке?

Я знаю, что управление потоком gRPC основано на HTTP / 2: https://httpwg.org/specs/rfc7540.html#FlowControl Я попытался установить grpc.http2.write_buffer_size на 67108864 (кажется, это максимальное значение), но ничего не произошло.

Вот реализация, которая освещает случай 2:

 # server.py
from concurrent import futures

import grpc
import myservicer_pb2_grpc, myservicer_pb2


class MyServicer(myservicer_pb2_grpc.MyServicer):

    def DoSomething(self, request_iterator, target, **kwargs):
        big_data = b'0' * 1920*1080*4
        for r in request_iterator:
            print("server received input big data")
            yield myservicer_pb2.OutputBigData(data=big_data)
            print("server sent output big data")


if __name__ == '__main__':
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10),
                         options=[('grpc.max_receive_message_length', -1),
                                  ('grpc.max_send_message_length', -1)])
    myservicer_pb2_grpc.add_MyServicerServicer_to_server(
        MyServicer(), server)
    server.add_insecure_port("[::]:50051")
    server.start()
    server.wait_for_termination()

  
 # client.py
import time
import grpc

import myservicer_pb2_grpc
import myservicer_pb2


def big_data_generator():
    big_data = b'0' * 1920*1080*4
    for i in range(100):
        yield myservicer_pb2.InputBigData(data=big_data)


def run():
    with grpc.insecure_channel('localhost:50051',
                               options=[('grpc.max_send_message_length', -1),
                                        ('grpc.max_receive_message_length', -1)]) as channel:
        stub = myservicer_pb2_grpc.MyServicerStub(channel)
        res = stub.DoSomething(big_data_generator())

        for r in res:
            print("Client received data")
            time.sleep(10)

if __name__ == '__main__':
    run()
  

Через 10 секунд мой сервер выводит:

 server received input big data
server sent output big data
server received input big data
server sent output big data
server received input big data
  

И мой вывод клиента:

 Client received data
  

Мой сервер получил 3 входных данных и отправил 2 выходных данных. Теперь он заблокирован до тех пор, пока клиент не получит выходные данные. В этом сценарии я хочу увеличить (в 2 или 3 раза) размер выходного буфера, чтобы он мог продолжать обрабатывать больше входных данных, даже если клиент запаздывает с получением результата.

Ответ №1:

Спасибо за подробный вопрос. Я попробовал ваш пример, но все еще не могу настроить gRPC на свободное увеличение размера его окна.

Аргументы канала gRPC можно найти здесь . Реализация управления потоком здесь, есть только несколько, которые могут повлиять на управление потоком, а именно:

  • grpc.http2.bdp_probe=0 : отключает автоматическое увеличение окна
  • grpc.http2.max_frame_size : Максимальный размер кадра HTTP / 2
  • grpc.http2.write_buffer_size : На самом деле это не опция управления потоком, она используется для GRPC_WRITE_BUFFER_HINT (запись без блокировки). Кроме того, GRPC_WRITE_BUFFER_HINT еще не поддерживается в gRPC Python

Нет аргумента, который мог бы вызвать обновление размера окна. Размер окна по умолчанию составляет 64 КБ. gRPC увеличит размер окна с помощью оценки BDP. Например, на моем ноутбуке размер окна, исходящего от клиента, увеличился до 8380679 (~ 8 МБ). Но мне еще предстоит найти способ вручную вмешаться в этот процесс.

Итак, к сожалению, вам может потребоваться буферизация на уровне приложения. Вы можете использовать сопрограммы в asyncio или потоковую обработку с потокобезопасной очередью как на стороне клиента, так и на стороне сервера.