Как сбросить отложенное / отклоненное сообщение в потоке данных TPL

#c# #task-parallel-library #pipeline #tpl-dataflow

#c# #задача-параллельная-библиотека #конвейер #tpl-поток данных

Вопрос:

Я использую TDF для своего приложения, которое пока отлично работает, к сожалению, я наткнулся на конкретную проблему, когда кажется, что она не может быть обработана напрямую с помощью существующих механизмов потока данных:

У меня есть N производителей (в данном случае BufferBlocks), которые все связаны только с 1 (все к одному и тому же) ActionBlock. Этот блок всегда обрабатывает 1 элемент за раз, а также имеет емкость только для 1 элемента.

К ссылке от производителей на ActionBlock я также хочу добавить фильтр, но особый случай здесь заключается в том, что условие фильтра может изменяться независимо от обрабатываемого элемента, и элемент не должен быть отброшен! Итак, в основном я хочу обработать все элементы, но порядок / время могут измениться, когда элемент будет обработан.

К сожалению, я узнал, что если элемент «отклонен» один раз -> условие фильтра принимает значение false, и если этот элемент не передается в другой блок (например, NullTarget), целевой блок не повторяет попытку того же элемента (и не переоценивает фильтр).

 public class ConsumeTest
  {
    private readonly BufferBlock<int> m_bufferBlock1;
    private readonly BufferBlock<int> m_bufferBlock2;
    private readonly ActionBlock<int> m_actionBlock;

    public ConsumeTest()
    {
      m_bufferBlock1 = new BufferBlock<int>();
      m_bufferBlock2 = new BufferBlock<int>();

      var options = new ExecutionDataflowBlockOptions() { BoundedCapacity = 1, MaxDegreeOfParallelism = 1 };
      m_actionBlock = new ActionBlock<int>((item) => BlockAction(item), options);

      var start = DateTime.Now;
      var elapsed = TimeSpan.FromMinutes(1);

      m_bufferBlock1.LinkTo(m_actionBlock, x => IsTimeElapsed(start, elapsed));
      m_bufferBlock2.LinkTo(m_actionBlock);

      FillBuffers();
    }

    private void BlockAction(int item)
    {
      Console.WriteLine(item);
      Thread.Sleep(2000);
    }

    private void FillBuffers()
    {
      for (int i = 0; i < 1000; i  )
      {
        if (i % 2 == 0)
        {
          m_bufferBlock1.Post(i);
        }
        else
        {
          m_bufferBlock2.Post(i);
        }
      }
    }

    private bool IsTimeElapsed(DateTime start, TimeSpan elapsed)
    {
      Console.WriteLine("checking time elapsed");
      return DateTime.Now > (start   elapsed);
    }

    public async Task Start()
    {
      await m_actionBlock.Completion;
    }
  }
  

Код настраивает конвейер тестирования и заполняет два буфера четными и нечетными числами. Оба буферных блока подключены к одному ActionBlock, который печатает только «обработанный» номер и ожидает 2 секунды.

Условие фильтрации между m_bufferBlock1 и m_actionBlock проверяет (в целях тестирования), прошла ли 1 минута с тех пор, как мы начали все это.

Если мы запустим это, он сгенерирует следующий вывод:

 1
checking time elapsed
3
5
7
9
11
13
15
17
19
  

Как мы можем видеть, ActionBlock берет первый элемент из BufferBlock без фильтра, затем пытается взять элемент из BufferBlock с фильтром. Фильтр принимает значение false и продолжает извлекать все элементы из блока без фильтра.

Мое ожидание состояло в том, что после обработки элемента из буферного блока без фильтра он снова попытается извлечь элемент из другого буферного блока с фильтром, снова оценив его.

Это было бы моим ожидаемым (или желаемым) результатом:

 1
checking time elapsed
3
checking time elapsed
5
checking time elapsed
7
checking time elapsed
9
checking time elapsed
11
checking time elapsed
13
checking time elapsed
15
// after timer has elapsed take elements also from other buffer
2
17
4
19
  

Теперь мой вопрос заключается в том, есть ли способ «сбросить» уже «отклоненное» сообщение, чтобы оно снова оценивалось, или есть другой способ, моделируя его по-другому? В общих чертах, НЕ важно, что они действительно извлекаются из обоих буферов строго поочередно! (потому что я знаю, что это зависит от расписания, и это совершенно нормально, если время от времени 2 элемента из одного и того же блока удаляются из очереди)
Но важно, чтобы сообщение «отклонено» не должно быть отброшено или повторно помещено в очередь, поскольку важен порядок внутри одного буфера.

Заранее благодарю вас

Ответ №1:

Одна из идей состоит в том, чтобы периодически или по требованию обновлять связь между двумя блоками. Реализовать периодически обновляемое LinkTo не очень сложно. Вот реализация:

 public static IDisposable LinkTo<TOutput>(this ISourceBlock<TOutput> source,
    ITargetBlock<TOutput> target, Predicate<TOutput> predicate,
    TimeSpan refreshInterval, DataflowLinkOptions linkOptions = null)
{
    if (source == null) throw new ArgumentNullException(nameof(source));
    if (target == null) throw new ArgumentNullException(nameof(target));
    if (predicate == null) throw new ArgumentNullException(nameof(predicate));
    if (refreshInterval < TimeSpan.Zero)
        throw new ArgumentOutOfRangeException(nameof(refreshInterval));
    linkOptions = linkOptions ?? new DataflowLinkOptions();

    var locker = new object();
    var cts = new CancellationTokenSource();
    var token = cts.Token;
    var currentLink = source.LinkTo(target, linkOptions, predicate);
    var loopTask = Task.Run(async () =>
    {
        try
        {
            while (true)
            {
                await Task.Delay(refreshInterval, token).ConfigureAwait(false);
                currentLink.Dispose();
                currentLink = source.LinkTo(target, linkOptions, predicate);
            }
        }
        finally
        {
            lock (locker) { cts.Dispose(); cts = null; }
        }
    }, token);

    _ = Task.Factory.ContinueWhenAny(new[] { source.Completion, target.Completion },
        _ => { lock (locker) cts?.Cancel(); }, token, TaskContinuationOptions.None,
        TaskScheduler.Default);

    return new Unlinker(() =>
    {
        lock (locker) cts?.Cancel();
        // Wait synchronously the task to complete, ignoring cancellation exceptions.
        try { loopTask.GetAwaiter().GetResult(); } catch (OperationCanceledException) { }
        currentLink.Dispose();
    });
}

private struct Unlinker : IDisposable
{
    private readonly Action _action;
    public Unlinker(Action disposeAction) => _action = disposeAction;
    void IDisposable.Dispose() => _action?.Invoke();
}
  

Пример использования:

 m_bufferBlock1.LinkTo(m_actionBlock, x => IsTimeElapsed(start, elapsed),
    refreshInterval: TimeSpan.FromSeconds(10));
  

Связь между m_bufferBlock1 и m_actionBlock будет обновляться каждые 10 секунд, пока не завершится один из двух блоков.