gRPC Поддерживает поток

#c# #.net #grpc

Вопрос:

Я экспериментирую с gRPC для длительной потоковой сессии, так как мне нужно гарантировать упорядочение сообщений от сервера к клиенту.

У меня есть следующее .прото:

 service Subscriber {
    rpc Subscribe(SubscriptionRequest) returns (stream SubscriberEvent);
}
 

Моя текущая служба (размещенная в ASP.NET / .NET 5.0) выглядит так:

 public class SubscriberService : Subscriber.SubscriberBase
{
    private readonly ILogger<SubscriberService> _logger;
    private readonly ConcurrentDictionary<string, IServerStreamWriter<SubscriberEvent>> _subscriptions = new();
    private int _messageCount = 0;
    private Timer _timer;

    public SubscriberService(ILogger<SubscriberService> logger)
    {
        _logger = logger;
        _timer = new Timer(o => TimerCallback(), null, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1));
    }

    private void TimerCallback()
    {
        Broadcast($"Current time is {DateTime.UtcNow}");
    }

    public override Task Subscribe(SubscriptionRequest request, IServerStreamWriter<SubscriberEvent> responseStream, ServerCallContext context)
    {
        _subscriptions.TryAdd(request.ClientId, responseStream);
        return responseStream.WriteAsync(new SubscriberEvent() {Id = 0, Message = "Subscribe successful"});
    }

    public void Broadcast(string message)
    {
        var count =   _messageCount;
        foreach (var sub in _subscriptions.Values)
        {
            sub.WriteAsync(new SubscriberEvent() { Id = count, Message = message });
        }
        _logger.LogInformation($"Broadcast message #{count}: {message}");
    }
}
 

Мой клиент получает только начальное сообщение «Подписаться успешно», но никогда не получает сообщения, вызванные таймером. Я не получаю никаких исключений при вызове WriteAsync.

Пытаюсь ли я использовать gRPC для чего-то, для чего он никогда не предназначался (замена SignalR/WebSocket), или я просто упускаю что-то очевидное?

Комментарии:

1. У меня нет опыта работы с gRPC в .NET. Из того, что я знаю, основываясь на использовании gRPC на других языках, когда вы возвращаетесь из функции, реализующей метод потоковой передачи, поток закрывается. Итак, когда вы возвращаетесь Subscribe , поток закрывается. Если вы попытаетесь напечатать ошибку , возвращенную из sub.WriteAsync in Broadcast , вы увидите сообщение об ошибке stream is closed .

2. @EaswarSwaminathan: Это, безусловно, объяснило бы это. WriteAsync не возвращает ничего, кроме ожидаемой задачи, и в журналах также нет ничего, что указывало бы на проблему. Я бы ожидал, что WriteAsync выдаст исключение, если поток был закрыт.

Ответ №1:

Для длительной потоковой передачи gRPC вам нужно дождаться, пока клиент сообщит, что соединение закрыто. Что-то вроде этого:

 while (!context.CancellationToken.IsCancellationRequested)
{
    // event-based action
    responseStream.WriteAsync(new SubscriberEvent() {Id = 0, Message = "Subscribe successful"});  
}
 

Комментарии:

1. Это действительно ответ. Это на мгновение сбило меня с толку 🙂

Ответ №2:

Я думаю, что предыдущий ответ был занят-подождите. Поэтому я хочу показать вам его асинхронную версию.

 public override Task Subscribe(RequestMessage, IServerStreamWriter<ReplyMessage> responseStream, ServerCallContext context)
 {
    // your event-based code here
    
    var tcs = new TaskCompletionSource();
    context.CancellationToken.Register(() => tcs.TrySetCanceled(), false);
    return tcs.Task;
 }
 

Кстати, я думаю, что занимался проектом, похожим на ваш. И я использовал Наблюдаемые объекты, Объекты и объекты воспроизведения из Rx.Net. Они очень полезны для кода, основанного на событиях.

Комментарии:

1. Спасибо, что ответили на этот вопрос. Я сделал цикл while, но с задачей ожидания. Задержка вместо принятого ответа. В итоге оказалось проще просто передать поток менеджеру подписки за пределами службы.