#.net #.net-core #async-await #task-parallel-library #producer-consumer
#.net #.net-core #асинхронное ожидание #задача-параллельная-библиотека #производитель-потребитель
Вопрос:
Контекст
В моем консольном приложении .NET 5 мой потребитель запускается как поток (задача). В настоящее время существует только один, но в будущем возможно одновременное использование пользователей в одном и том же экземпляре BufferBlock.
Текущая реализация работает нормально. Я застрял с реализацией следующей добавленной функции:
Я хотел бы просыпаться в настроенный период, даже если буфер пуст, и все равно оставаться в основном цикле. Этот вариант использования должен отличаться от варианта пробуждения по IsCancellationRequested токена, но необязательно отличать его от «продукта, доступного для использования», не обязательно.
Я вижу перегрузку ReceiveAsync
с таймаутом, но неясно, как OutputAvailableAsync
это происходит, которая не принимает тайм-аут.
Вопрос
Как реализовать пробуждение в течение заданного периода времени и оставаться в цикле. Только в том случае, если IsCancellationRequested должен прервать цикл
public class MyConsumer
{
private readonly BufferBlock<MyProduct> _products;
public void Start(CancellationToken token)
{
Task.Factory.StartNew(() => Run(token), token);
}
private async Task Run(CancellationToken token)
{
await ConsumeAsync(token);
}
private async Task ConsumeAsync(CancellationToken token)
{
while (await _products.OutputAvailableAsync(token))
{
var product = await _products.ReceiveAsync(token);
// Consume product goes here:...
// I would like wake up here in a configured period, even the buffer is empty.
//
// How to implement this timeout based wake up? (then still remain in the loop)
// I do not even understand clearly why are we using the two waiting operations,
// the 1) OutputAvailableAsync(token) then 2) ReceiveAsync(token)
}
}
}
Комментарии:
1. Что касается, я даже не совсем понимаю, почему мы используем две операции ожидания -> вы можете заменить
await _products.OutputAvailableAsync(token)
наwhile(true)
, потомуReceiveAsync
что будет ждать, пока не будет доступен вывод, но вам нужно проверить, завершен ли блок, и, как таковой, ничего не даст в будущем, самостоятельно.OutputAvailableAsync
упрощает выход, когда больше выходных данных никогда не будет доступно.2. Я хотел бы просыпаться здесь в настроенный период, даже если буфер пуст. Я не понимаю варианта использования. Если буфер пуст, продукт недоступен. Разве вы не можете создать отдельный метод, выполняющий все, что вы хотите, и вызывать этот метод в нужное время?
Ответ №1:
Я не очень хорошо знаком с BufferBlock
, но в общем смысле: если async
API не предоставляет тайм-аут, вы можете имитировать то же самое с помощью токена отмены с тайм-аутом — и при этом уважать существующий токен:
using var cts = new CancellationTokenSource();
// link the existing CancellationToken so that *it* can propagate cancellation
using var linked = token.Register(
static s => ((CancellationTokenSource)s).Cancel(), cts, false);
// add a timeout
cts.CancelAfter(yourTimeoutHere);
// use this new combined token to do the magic
await DoSomethingAsync(cts.Token);
(если вы не используете C # 9 или выше, удалите static
; это просто проверяет, что мы не вызываем дополнительных выделений из-за захваченных переменных в обратном вызове)
Как только у вас это будет на месте, вы можете просто реагировать на свой тайм-аут как на дополнительную работу, перехватывать OperationCanceledException
и делать то, что вам нужно. Небольшое предостережение заключается в знании того, в каком состоянии вещи останутся в DoSomethingAsync
случае отмены. Это будет зависеть от конкретного сценария.
Комментарии:
1. Я не думаю, что если асинхронный API не предоставляет тайм-аут , то это тот случай. Он поддерживает тайм-ауты. Похоже, что фактический вопрос заключается в том, как можно ввести метод, пока метод ожидает вывода из буферного блока.
2. @PeterBons возможно, я был недостаточно ясен; Я добавлю абзац…
3. Спасибо, я думаю, это значительно улучшает ответ.
4. @PeterBons the . OutputAvailableAsync (токен) не предоставляет тайм-аут. Поток управления является асинхронным и ожидает там, пока не произойдет отмена или элемент не будет доступен в буфере.