Python gRPC отменяет вызов унарного потока со стороны клиента

#python #grpc #grpc-python

#python #grpc #grpc-python

Вопрос:

Используя Python gRPC, я хотел бы иметь возможность отменить длительный вызов унарного потока со стороны клиента, когда threading.Event установлено.

 def application(stub: StreamsStub, event: threading.Event):
    stream = stub.Application(ApplicationStreamRequest())
    try:
        for resp in stream:
            print(resp)
    except grpc.RpcError as e:
        print(e)
  

В настоящее время я отменяю поток, используя channel.close() метод, но, конечно, это закрывает все соединения, а не только этот поток.

Может кто-нибудь подсказать, как я могу использовать событие для отмены итератора потока? Спасибо

Ответ №1:

Ниже приведен некоторый код для однонаправленного вызова gRPC. Сервер отправляет бесконечное количество ответов, оставляя клиенту решать, когда прекратить их получение.

Вместо использования счетчика вы можете отключить поток и выполнить некоторую работу, а также установить событие, которое проверяется перед вызовом cancel(), вместо проверки счетчика.

Примечание: использование Python 2.7

Протофайл:

 syntax = "proto3";

package my_package;

service HeartBeat {
    rpc Beats(Counter) returns (stream Counter) {}
}

message Counter {
    int32 counter = 1;
}
  

Клиент:

 from __future__ import print_function

import grpc
import heartbeat_pb2
import heartbeat_pb2_grpc


def get_beats(stub, channel):
    try:
        result_iterator = stub.Beats(heartbeat_pb2.Counter(counter=i))
        for result in result_iterator:
            print("Count: {}".format(result.counter))
            if result.counter >= 3: # We only wants 3 'beats'
                result_iterator.cancel()
    except grpc.RpcError as rpc_error:
        if rpc_error.code() == grpc.StatusCode.CANCELLED:
            pass # Otherwise, a traceback is printed


def run():
    with grpc.insecure_channel('localhost:9999') as channel:
        stub = heartbeat_pb2_grpc.HeartBeatStub(channel)
        get_beats(stub, channel)


if __name__ == '__main__':
    run()
  

Сервер:

 from concurrent import futures

import grpc
from proto_generated import heartbeat_pb2
from proto_generated import heartbeat_pb2_grpc
import time


class HeartBeatServicer(heartbeat_pb2_grpc.HeartBeatServicer):
    pass


    def Beats(self, request, context):
        # Not required, only to show sending the server a message
        print("Beats: {}".format(request.counter))

        def response_message():
            i = 0
            while context.is_active():
                print("Sending {}".format(i))
                response = heartbeat_pb2.Counter(counter=i)
                i  = 1
                time.sleep(1)  # Simulate doing work
                yield response

        return response_message()


def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    heartbeat_pb2_grpc.add_HeartBeatServicer_to_server(
        HeartBeatServicer(), server)
    server.add_insecure_port('[::]:9999')
    server.start()
    server.wait_for_termination()


if __name__ == '__main__':
    serve()
  

Комментарии:

1. Особая благодарность Nordine Lotfi (пользователь 12349101) и MisterMiyagi (пользователь 5349916) за их помощь в чате SO Python за их помощь.

Ответ №2:

_Rendezvous Объект, возвращаемый вызовом rpc, реализует grpc.RpcError , grpc.Future и grpc.Call , поэтому отменить поток так же просто, как вызвать stream.cancel (из grpc.Future интерфейса)