Как буферизировать сообщения на signal hub и отправлять их при появлении нужного клиента?

#c# #asp.net-core #signalr #asp.net-core-signalr

#c# #asp.net-core #signalr #asp.net-core-signalr

Вопрос:

У меня есть два типа клиентов, подключающих мой сервер SignalR (ASP.NET Ядро). Некоторые из них являются отправителями, а некоторые — получателями. Мне нужно маршрутизировать сообщения от отправителей к получателям, что не является проблемой, но когда нет получателей, мне нужно как-то буферизировать сообщения и не терять их (вероятно, лучшим является ConcurrentQueue в каком-то одноэлементном классе), но при подключении первого получателя буфер сообщений должензапустите удаление из очереди. Какой наилучший подход для этого?

Я создал одноэлементный класс, который обертывает коллекцию ConcurrentQueue, и я ставлю в очередь и удаляю сообщения из очереди. Также у меня есть отдельный одноэлементный класс, в котором сохраняется коллекция идентификаторов соединений получателей. И я реализовал событие в этом втором классе, которое запускает событие, когда первый приемник подключается после того, как список получателей был пуст, но, возможно, это не очень хороший подход, я не знаю, как использовать id в Hub, потому что существует более одного экземпляра SignalR hub. Второй подход заключается в том, чтобы пометить класс persistence как controller и внедрить ContextHub и буфер сообщений в этот класс, а оттуда удалить буфер из очереди и напрямую отправлять сообщения получателям???

Комментарии:

1. Как я могу правильно внедрить IHubContext и два других одноэлементных класса в мой класс контроллера?

Ответ №1:

Если я правильно понял, вы хотите отложить отправку сообщения SignalR, используя что-то вроде синхронизированного вызова в каком-нибудь IHostedService. Вот чего мне удалось достичь до сих пор.

  • Ваш подход, который заключается в использовании ConcurrentQueue, который содержит вызываемые делегаты действий для обработки одновременных вызовов концентратора, является правильным. Как вы упомянули, он должен быть введен как одноэлементный.

Итак, здесь Queues класс:

 public class Queues {
    public ConcurrentQueue<Action<IHubContext<MyHub, IMyEvents>>> MessagesQueue { get; set; }
}
 
  • Теперь нам нужно захватить ConnectionId вызывающего абонента, чтобы вызов мог получить ответ позже. SendMessage поставьте в очередь необходимый делегат действия для выполнения вызова экземпляра концентратора в качестве параметра.

В качестве примера SendMessage будет запущен ответ вызывающему абоненту и BroadcastMessage отправлено сообщение всем клиентам.

Использование захваченного экземпляра концентратора вместо этого приведет к исключению здесь, потому что концентратор будет быстро удален. Вот почему он будет введен позже в другом классе. Посмотрите на SendMessage_BAD

Вот MyHub класс и соответствующий IMyEvents интерфейс:

 public interface IMyEvents {
    void ReceiveMessage(string myMessage);
}

public class MyHub : Hub<IMyEvents> {
    Queues queues;

    public MyHub(Queues queues) {
        this.queues = queues;
    }

    public void SendMessage(string message) {
        var callerId = Context.ConnectionId;
        queues.MessagesQueue.Enqueue(hub => hub.Clients.Client(callerId).ReceiveMessage(message));
    }

    // This will crash
    public void SendMessage_BAD(string message) {
        this.callerId = Context.ConnectionId;
        queues.MessagesQueue.Enqueue(_ => this.Clients.Client(callerId).ReceiveMessage(message));
    }

    public void BroadcastMessage(string message) {
        queues.MessagesQueue.Enqueue(hub => hub.Clients.All.ReceiveMessage(message));
    }
}
 
  • Теперь, используя наивный подход, этот код вызовет отправку сообщения отложенным способом. (На работе таймер обеспечивает регулярную частоту, а класс IHostedService — это, но он здесь не отображается). Этот класс должен быть введен как одноэлементный.

Здесь DeferredMessageSender класс:

 public class DeferredMessageSender {
    Queues queues;
    IHubContext<MyHub, IMyEvents> hub;

    public DeferredMessageSender(Queues queues, IHubContext<MyHub, IMyEvents> hub) {
        this.queues = queues;
        this.hub = hub;
    }

    public void GlobalSend() {
        while(queues.MessagesQueue.TryDequeue(out var evt)) {
            evt.Invoke(hub);
        }
    }
}
 

Надеюсь, это поможет.

Комментарии:

1. Откуда мне знать, что некоторые клиенты не получили сообщение, чтобы я мог повторить попытку позже? У меня есть очередь сообщений для клиентов, но я понятия не имею, как обеспечить доставку им того, что они пропустили после повторного подключения. Я не могу найти никаких ресурсов, у вас есть какие-нибудь идеи, как это сделать?

2. Это сложно. Мы используем обходной путь, который заключается в том, чтобы помещать каждое отправляемое нами сообщение в скользящий список с отметкой времени 1 (чтобы ограничить количество буферизуемых сообщений). Когда клиент отключается в первый раз, он получает временную метку, когда это происходит, затем он пытается повторно подключиться, пока ему не удастся это сделать. Затем он отправляет «getmissedmessage (временная метка)», поэтому сервер извлекает сообщения до этой временной метки из скользящего списка и повторно отправляет их только этому клиенту.