Производитель / потребитель с BufferBlock, как периодически просыпаться, даже если в буфере нет продукта?

#.net #.net-core #async-await #task-parallel-library #producer-consumer

#.net #.net-core #асинхронное ожидание #задача-параллельная-библиотека #производитель-потребитель

Вопрос:

Контекст

В моем консольном приложении .NET 5 мой потребитель запускается как поток (задача). В настоящее время существует только один, но в будущем возможно одновременное использование пользователей в одном и том же экземпляре BufferBlock.

Текущая реализация работает нормально. Я застрял с реализацией следующей добавленной функции:

Я хотел бы просыпаться в настроенный период, даже если буфер пуст, и все равно оставаться в основном цикле. Этот вариант использования должен отличаться от варианта пробуждения по IsCancellationRequested токена, но необязательно отличать его от «продукта, доступного для использования», не обязательно.

Я вижу перегрузку ReceiveAsync с таймаутом, но неясно, как OutputAvailableAsync это происходит, которая не принимает тайм-аут.

Вопрос

Как реализовать пробуждение в течение заданного периода времени и оставаться в цикле. Только в том случае, если IsCancellationRequested должен прервать цикл

 public class MyConsumer
{
    private readonly BufferBlock<MyProduct> _products;

    public void Start(CancellationToken token)
    {
        Task.Factory.StartNew(() => Run(token), token);
    }

    private async Task Run(CancellationToken token)
    {
        await ConsumeAsync(token);
    }

    private async Task ConsumeAsync(CancellationToken token)
    {
        while (await _products.OutputAvailableAsync(token))
        {
            var product = await _products.ReceiveAsync(token);
            // Consume product goes here:...

            // I would like wake up here in a configured period, even the buffer is empty.
            //
            // How to implement this timeout based wake up? (then still remain in the loop)
            // I do not even understand clearly why are we using the two waiting operations, 
            // the 1) OutputAvailableAsync(token) then 2) ReceiveAsync(token)
        }
    }
}
 

Комментарии:

1. Что касается, я даже не совсем понимаю, почему мы используем две операции ожидания -> вы можете заменить await _products.OutputAvailableAsync(token) на while(true) , потому ReceiveAsync что будет ждать, пока не будет доступен вывод, но вам нужно проверить, завершен ли блок, и, как таковой, ничего не даст в будущем, самостоятельно. OutputAvailableAsync упрощает выход, когда больше выходных данных никогда не будет доступно.

2. Я хотел бы просыпаться здесь в настроенный период, даже если буфер пуст. Я не понимаю варианта использования. Если буфер пуст, продукт недоступен. Разве вы не можете создать отдельный метод, выполняющий все, что вы хотите, и вызывать этот метод в нужное время?

Ответ №1:

Я не очень хорошо знаком с BufferBlock , но в общем смысле: если async API не предоставляет тайм-аут, вы можете имитировать то же самое с помощью токена отмены с тайм-аутом — и при этом уважать существующий токен:

 using var cts = new CancellationTokenSource();
// link the existing CancellationToken so that *it* can propagate cancellation
using var linked = token.Register(
    static s => ((CancellationTokenSource)s).Cancel(), cts, false);
// add a timeout
cts.CancelAfter(yourTimeoutHere);
// use this new combined token to do the magic
await DoSomethingAsync(cts.Token);
 

(если вы не используете C # 9 или выше, удалите static ; это просто проверяет, что мы не вызываем дополнительных выделений из-за захваченных переменных в обратном вызове)


Как только у вас это будет на месте, вы можете просто реагировать на свой тайм-аут как на дополнительную работу, перехватывать OperationCanceledException и делать то, что вам нужно. Небольшое предостережение заключается в знании того, в каком состоянии вещи останутся в DoSomethingAsync случае отмены. Это будет зависеть от конкретного сценария.

Комментарии:

1. Я не думаю, что если асинхронный API не предоставляет тайм-аут , то это тот случай. Он поддерживает тайм-ауты. Похоже, что фактический вопрос заключается в том, как можно ввести метод, пока метод ожидает вывода из буферного блока.

2. @PeterBons возможно, я был недостаточно ясен; Я добавлю абзац…

3. Спасибо, я думаю, это значительно улучшает ответ.

4. @PeterBons the . OutputAvailableAsync (токен) не предоставляет тайм-аут. Поток управления является асинхронным и ожидает там, пока не произойдет отмена или элемент не будет доступен в буфере.