Предоставление push-сообщения через посредника сообщений обратного вызова в качестве IAsyncEnumerable

#c# #asynccallback #iasyncenumerable #system.threading.channels

Вопрос:

Я работаю со сторонней библиотекой, которая действует как интерфейс для брокера сообщений pub-sub. Брокер-Solace PubSub .

Для подписчиков библиотека поставщиков использует шаблон «отправка сообщений с помощью обратного вызова».

Я пишу свою собственную библиотеку-оболочку вокруг библиотеки поставщиков, чтобы облегчить работу с ней другим разработчикам (скрывая все внутренние компоненты того, как библиотека взаимодействует с сетью и так далее).

В том же духе я думаю, что было бы полезно предоставить канал подписчика в качестве IAsyncEnumerable , и я думаю, что это может быть хорошим вариантом использования System.Threading.Channels . У меня есть две проблемы:

  1. Подходят ли здесь каналы, или я переоцениваю это? Т. Е., Существует ли более «идиоматический» способ переноса обратных вызовов на C#?
  2. Безопасна ли моя EnumerableBroker реализация оболочки, или я где — то попал в асинхронную ловушку?

Я понимаю, что первый вопрос может лучше подходить для просмотра кода, чем ТАК, но, поскольку ответ на него также связан со второй проблемой, кажется целесообразным объединить их вместе. Стоит отметить: я избегаю IObservable / Rx, так как моя цель-сделать свой интерфейс более простым, чем у поставщика, а не требовать, чтобы другие разработчики и я изучали Rx! Понимание того, как процессы производителя и потребителя независимы, также тривиально с каналом посередине, в то время как при наблюдаемом моем первом мыслительном процессе «Хорошо, так производитель и потребитель все еще независимы? На первый взгляд кажется, что теперь мне нужно узнать о планировщиках… Боже, как насчет того, чтобы я просто использовал an await foreach

Вот минимальный макет потребления сообщений без EnumerableBroker :

 // mockup of third party class
private class Broker
{
    // mockup of how the third party library pushes messages via callback
    public void Subscribe(EventHandler<int> handler) => this.handler = handler;

    //simulate the broker pushing messages. Not "real" code
    public void Start()
    {
        Task.Run
        (
            () =>
            {
                for (int i = 0; !cts.Token.IsCancellationRequested; i  )
                {
                    // simulate internal latency
                    Thread.Sleep(10);
                    handler?.Invoke(this, i);
                }
            }, cts.Token
        );
    }

    public void Stop() => cts.Cancel();

    private CancellationTokenSource cts = new();
    private EventHandler<int> handler;
}

private static async Task Main()
{
    var broker = new Broker();
    broker.Subscribe((_, msg) => Console.WriteLine(msg));
    broker.Start();
    await Task.Delay(1000);
    broker.Stop();
}
 

И теперь с минимальным воспроизведением EnumerableBroker (все еще используя тот же макет Broker класса, указанный выше). По крайней мере, одно преимущество здесь, по — видимому, заключается в том, что, если подписчику нужно выполнить много работы для обработки сообщения, это не связывает поток брокера-по крайней мере, до тех пор, пока буфер не заполнится. Кажется, это работает без ошибок, но я научился остерегаться своего ограниченного понимания асинхронности.

 private class EnumerableBroker
{
    public EnumerableBroker(int bufferSize = 8)
    {
        buffer = Channel.CreateBounded<int>
        (
            new BoundedChannelOptions(bufferSize) { SingleReader = true,
                SingleWriter = true }
        );
    }

    public IAsyncEnumerable<int> ReadAsync(CancellationToken ct)
    {
        broker.Subscribe
        (
            // switched to sync per Theodor's comments
            (_, args) => buffer.Writer.WriteAsync(args, ct).AsTask().Wait()
        );
        ct.Register(broker.Stop);
        broker.Start();
        return buffer.Reader.ReadAllAsync(ct);
    }

    private readonly Channel<int> buffer;
    private readonly Broker broker = new();
}

private static async Task Main()
{
    var cts = new CancellationTokenSource();
    var broker = new EnumerableBroker();
    cts.CancelAfter(1000);
    try
    {
        await foreach (var msg in broker.ReadAsync(cts.Token))
        {
            Console.WriteLine(msg);
        }
    }
    catch (OperationCanceledException) { }
}
 

Комментарии:

1. Просто добавляю комментарий здесь относительно концепций высокого уровня… Естественная абстракция для данных, основанных на push, является наблюдаемой. IAsyncEnumerable предназначен для асинхронных данных на основе вытягивания. Вот почему канал необходим: вам нужен буфер для перевода на основе push на основе pull. Именно поэтому вам также необходимо решить (или разрешить конечным пользователям настраивать) стратегию буферизации: количество элементов и то, как вести себя, когда буфер заполнен.

2. @StephenCleary Понял и согласился, наблюдаемое действительно кажется «очевидным» концептуальным отображением, но меня отпугнула «очевидная» сложность. Я мог бы ошибаться относительно реальной сложности, но использование IObservable , похоже, заставляет потребителей работать в терминах IObserver s, что является совершенно новой абстракцией для изучения, в то IAsyncEnumerable время как требуется только, чтобы они могли написать an await foreach .

3. @allmhuran это перепроектировано только в том смысле, что для использования каналов не требуется так много кода. Типичный шаблон заключается в использовании только методов , возвращающих значение a ChannelReader , при этом сам метод создает канал и владеет им.

Ответ №1:

Я что, переоцениваю это?

Нет. A Channel -это именно тот компонент, который вам нужен для реализации этой функции. Это довольно простой механизм. Это в основном асинхронная версия BlockingCollection<T> класса с некоторыми дополнительными функциями (например Completion , свойство) и необычным API ( Reader и Writer фасады).

Безопасна ли моя реализация оболочки EnumerableBroker, или я где — то попал в асинхронную ловушку?

Да, это ловушка, и вы попались в нее. SingleWriter = true Конфигурация означает, что в WriteAsync полете допускается одновременное выполнение не более одной операции. Перед выпуском следующего WriteAsync необходимо заполнить предыдущий. Подписавшись на broker с async void делегатом, вы создаете, по сути, отдельного автора (производителя) для каждого сообщения, отправленного брокером. Скорее всего, компонент пожалуется на это неправильное использование, бросив InvalidOperationException s или что-то в этом роде. Решение состоит в том, чтобы не переключаться на SingleWriter = false though. Это просто позволит обойти ограниченную пропускную способность Channel , создав внешнюю-и крайне неэффективную — очередь с сообщениями, которые не помещаются во внутреннюю очередь Channel . Решение состоит в том, чтобы переосмыслить свою стратегию буферизации. Если вы не можете позволить себе буферизировать неограниченное количество сообщений, вы должны либо удалить сообщения, либо создать исключение и убить потребителя. Вместо await buffer.Writer.WriteAsync этого лучше подавать канал синхронно bool accepted = buffer.Writer.TryWrite и предпринимать соответствующие действия в случае accepted false , если это так .

Еще одно соображение, которое вы должны иметь в виду, заключается в том, что ChannelReader.ReadAllAsync метод потребляет много времени. Это означает, что если у вас несколько читателей/потребителей одного и того же канала, каждое сообщение будет доставлено только одному из потребителей. Другими словами, каждый потребитель получит частичное подмножество сообщений канала. Вы должны сообщить об этом своим коллегам, потому что перечислять одно и то же IAsyncEnumerable<T> несколько раз довольно тривиально. В конце концов, an IAsyncEnumerable<T> -это не что иное, как фабрика IAsyncEnumerator<T> s.

Наконец, вместо того , чтобы контролировать срок действия каждой подписки с помощью a CancellationToken , вы можете облегчить жизнь своим коллегам, просто автоматически прекращая подписку, когда заканчивается перечисление an IAsyncEnumerator<T> . Когда await foreach цикл заканчивается каким-либо образом (например, путем break или исключением), связанный IAsyncEnumerator<T> объект автоматически удаляется. Язык C# ловко связал DisposeAsync вызов с finally блоком итератора, если блок try/finally обертывает цикл получения. Вы могли бы воспользоваться такой замечательной функцией, как эта:

 public async IAsyncEnumerable<int> ReadAsync(CancellationToken ct)
{
    broker.Subscribe
    (
        //...
    );
    broker.Start();
    try
    {
        await foreach (var msg in buffer.Reader.ReadAllAsync(ct))
        {
            yield return msg;
        }
    }
    finally
    {
        broker.Stop();
    }
}
 

Комментарии:

1. Какой фантастический ответ. Очень признателен. Однако у меня есть одна проблема: мне действительно нужно оказать давление на брокера, потому что это «гарантированный» конвейер сообщений. Другими словами, я действительно хочу, чтобы библиотека брокера перестала получать сообщения из сети… Я просто хочу ограничить это «бурными» периодами и не делать это типичным для каждого сообщения. Таким образом, удаление сообщений-это не вариант. Есть ли лучший способ позволить каналу применять противодавление? WaitToWriteAsync может быть? Или просто добавить .Wait() ?

2. Такс @allmhuran. Я не думаю, что ни WaitToWriteAsync то, ни Wait другое может решить эту проблему. Можно ли отказаться от подписки у брокера, когда количество буферизованных сообщений превышает некоторый лимит, и повторно подписаться позже?

3. Это возможно, но досадно неэффективно. Фактическая платформа брокера (в сети) имеет большой буфер хранения, позволяющий ей гарантировать доставку и сглаживать всплески. Вариант использования-это непрерывная служба, хотя ожидаемого «конца» потока сообщений не ожидается, поскольку он представляет собой компонент интеграции в реальном времени между корпоративными приложениями. Это также означает, что скорость создания сообщений совершенно непредсказуема, и я бы предпочел, чтобы поток обратного вызова библиотеки поставщика «зависал», пока локальный буфер истощен, чем отключаться и повторно подключаться, так как это может происходить часто.

4. @allmhuran в этом случае вы можете поэкспериментировать с синхронным Wait : buffer.Writer.WriteAsync(msg).AsTask().Wait(); это может сработать, если брокер выдает сообщения из одного потока. Это не сработает , если брокер выгружает вызов Subscribe делегата в ThreadPool или если он поддерживает свой собственный неограниченный буфер внутри.

5. Вас понял, я собирался прокомментировать свою правку, в которую я добавил AsTask().Wait() . Я могу настроить собственный размер внутреннего буфера библиотеки, чтобы эта часть была покрыта. Я не знаю, как они справляются со своими потоками, но я буду следить за этим. Еще раз спасибо тебе, ты замечательный человек.

Ответ №2:

Это перепроектировано только в том смысле, что для использования каналов не требуется так много кода. Типичный шаблон заключается в использовании только методов, которые принимают a ChannelReader в качестве входных данных и возвращают a ChannelReader в качестве выходных данных, при этом сам метод создает выходной канал и владеет им. Это очень упрощает создание этапов в конвейер, особенно если эти методы являются методами расширения.

В этом случае ваш код может быть переписан как :

 static ChannelReader<int> ToChannel(this Broker broker, 
    int limit,CancellationToken token=default)
{
    var channel=Channel.CreateBounded<int>(limit);
    var writer=channel.Writer;

    broker.Subscribe((_, args) =>{
        writer.TryWrite(args, token);
    });
    token.Register(()=>writer.Complete());

    return channel;
}
 

Это приведет к потере всех сообщений, превышающих лимит. Если вы Broker понимаете Task s, вы могли бы использовать:

 broker.Subscribe(async (_, args) =>{
        await writer.WriteAsync(args, token);
    });
 

Если он не понимает задач, и вы не можете позволить себе что-либо потерять, возможно, лучшим решением было бы использовать неограниченный канал и обрабатывать паузу/возобновление на более позднем этапе. Вы уже задавали аналогичный вопрос.

В противном случае вам придется заблокировать обратный вызов:

 broker.Subscribe(async (_, args) =>{
       writer.WriteAsync(args, token).AsTask().Wait();
    });
 

Однако это не идеальное решение.

В обоих случаях вы можете использовать данные, созданные считывателем:

 var token=cts.Token;
var reader=broker.ToChannel(10,token);

await foreach(var item in reader.ReadAllAsync(token))
{
...
}
 

Комментарии:

1. Действительно, следующий вопрос возник после того, как я получил подтверждение от поставщика, что этот подход будет небезопасен для использования с их библиотекой, следовательно, необходимо эффективно использовать их функции приостановки и возобновления. Общая ДНК в ваших ответах здесь и там очевидна.