#c# #.net #grpc
Вопрос:
Я экспериментирую с gRPC для длительной потоковой сессии, так как мне нужно гарантировать упорядочение сообщений от сервера к клиенту.
У меня есть следующее .прото:
service Subscriber {
rpc Subscribe(SubscriptionRequest) returns (stream SubscriberEvent);
}
Моя текущая служба (размещенная в ASP.NET / .NET 5.0) выглядит так:
public class SubscriberService : Subscriber.SubscriberBase
{
private readonly ILogger<SubscriberService> _logger;
private readonly ConcurrentDictionary<string, IServerStreamWriter<SubscriberEvent>> _subscriptions = new();
private int _messageCount = 0;
private Timer _timer;
public SubscriberService(ILogger<SubscriberService> logger)
{
_logger = logger;
_timer = new Timer(o => TimerCallback(), null, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1));
}
private void TimerCallback()
{
Broadcast($"Current time is {DateTime.UtcNow}");
}
public override Task Subscribe(SubscriptionRequest request, IServerStreamWriter<SubscriberEvent> responseStream, ServerCallContext context)
{
_subscriptions.TryAdd(request.ClientId, responseStream);
return responseStream.WriteAsync(new SubscriberEvent() {Id = 0, Message = "Subscribe successful"});
}
public void Broadcast(string message)
{
var count = _messageCount;
foreach (var sub in _subscriptions.Values)
{
sub.WriteAsync(new SubscriberEvent() { Id = count, Message = message });
}
_logger.LogInformation($"Broadcast message #{count}: {message}");
}
}
Мой клиент получает только начальное сообщение «Подписаться успешно», но никогда не получает сообщения, вызванные таймером. Я не получаю никаких исключений при вызове WriteAsync.
Пытаюсь ли я использовать gRPC для чего-то, для чего он никогда не предназначался (замена SignalR/WebSocket), или я просто упускаю что-то очевидное?
Комментарии:
1. У меня нет опыта работы с gRPC в .NET. Из того, что я знаю, основываясь на использовании gRPC на других языках, когда вы возвращаетесь из функции, реализующей метод потоковой передачи, поток закрывается. Итак, когда вы возвращаетесь
Subscribe
, поток закрывается. Если вы попытаетесь напечатать ошибку , возвращенную изsub.WriteAsync
inBroadcast
, вы увидите сообщение об ошибкеstream is closed
.2. @EaswarSwaminathan: Это, безусловно, объяснило бы это. WriteAsync не возвращает ничего, кроме ожидаемой задачи, и в журналах также нет ничего, что указывало бы на проблему. Я бы ожидал, что WriteAsync выдаст исключение, если поток был закрыт.
Ответ №1:
Для длительной потоковой передачи gRPC вам нужно дождаться, пока клиент сообщит, что соединение закрыто. Что-то вроде этого:
while (!context.CancellationToken.IsCancellationRequested)
{
// event-based action
responseStream.WriteAsync(new SubscriberEvent() {Id = 0, Message = "Subscribe successful"});
}
Комментарии:
1. Это действительно ответ. Это на мгновение сбило меня с толку 🙂
Ответ №2:
Я думаю, что предыдущий ответ был занят-подождите. Поэтому я хочу показать вам его асинхронную версию.
public override Task Subscribe(RequestMessage, IServerStreamWriter<ReplyMessage> responseStream, ServerCallContext context)
{
// your event-based code here
var tcs = new TaskCompletionSource();
context.CancellationToken.Register(() => tcs.TrySetCanceled(), false);
return tcs.Task;
}
Кстати, я думаю, что занимался проектом, похожим на ваш. И я использовал Наблюдаемые объекты, Объекты и объекты воспроизведения из Rx.Net. Они очень полезны для кода, основанного на событиях.
Комментарии:
1. Спасибо, что ответили на этот вопрос. Я сделал цикл while, но с задачей ожидания. Задержка вместо принятого ответа. В итоге оказалось проще просто передать поток менеджеру подписки за пределами службы.