#python #grpc #grpc-python
#python #grpc #grpc-python
Вопрос:
Используя Python gRPC, я хотел бы иметь возможность отменить длительный вызов унарного потока со стороны клиента, когда threading.Event
установлено.
def application(stub: StreamsStub, event: threading.Event):
stream = stub.Application(ApplicationStreamRequest())
try:
for resp in stream:
print(resp)
except grpc.RpcError as e:
print(e)
В настоящее время я отменяю поток, используя channel.close()
метод, но, конечно, это закрывает все соединения, а не только этот поток.
Может кто-нибудь подсказать, как я могу использовать событие для отмены итератора потока? Спасибо
Ответ №1:
Ниже приведен некоторый код для однонаправленного вызова gRPC. Сервер отправляет бесконечное количество ответов, оставляя клиенту решать, когда прекратить их получение.
Вместо использования счетчика вы можете отключить поток и выполнить некоторую работу, а также установить событие, которое проверяется перед вызовом cancel(), вместо проверки счетчика.
Примечание: использование Python 2.7
Протофайл:
syntax = "proto3";
package my_package;
service HeartBeat {
rpc Beats(Counter) returns (stream Counter) {}
}
message Counter {
int32 counter = 1;
}
Клиент:
from __future__ import print_function
import grpc
import heartbeat_pb2
import heartbeat_pb2_grpc
def get_beats(stub, channel):
try:
result_iterator = stub.Beats(heartbeat_pb2.Counter(counter=i))
for result in result_iterator:
print("Count: {}".format(result.counter))
if result.counter >= 3: # We only wants 3 'beats'
result_iterator.cancel()
except grpc.RpcError as rpc_error:
if rpc_error.code() == grpc.StatusCode.CANCELLED:
pass # Otherwise, a traceback is printed
def run():
with grpc.insecure_channel('localhost:9999') as channel:
stub = heartbeat_pb2_grpc.HeartBeatStub(channel)
get_beats(stub, channel)
if __name__ == '__main__':
run()
Сервер:
from concurrent import futures
import grpc
from proto_generated import heartbeat_pb2
from proto_generated import heartbeat_pb2_grpc
import time
class HeartBeatServicer(heartbeat_pb2_grpc.HeartBeatServicer):
pass
def Beats(self, request, context):
# Not required, only to show sending the server a message
print("Beats: {}".format(request.counter))
def response_message():
i = 0
while context.is_active():
print("Sending {}".format(i))
response = heartbeat_pb2.Counter(counter=i)
i = 1
time.sleep(1) # Simulate doing work
yield response
return response_message()
def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
heartbeat_pb2_grpc.add_HeartBeatServicer_to_server(
HeartBeatServicer(), server)
server.add_insecure_port('[::]:9999')
server.start()
server.wait_for_termination()
if __name__ == '__main__':
serve()
Комментарии:
1. Особая благодарность Nordine Lotfi (пользователь 12349101) и MisterMiyagi (пользователь 5349916) за их помощь в чате SO Python за их помощь.
Ответ №2:
_Rendezvous
Объект, возвращаемый вызовом rpc, реализует grpc.RpcError
, grpc.Future
и grpc.Call
, поэтому отменить поток так же просто, как вызвать stream.cancel
(из grpc.Future
интерфейса)