Блокировка обработки потока данных TPL

#c# #task-parallel-library #dataflow

#c# #задача-параллельная-библиотека #поток данных

Вопрос:

Я подписан на поток данных в реальном времени и поддерживаю состояние на основе полученных данных. Обычно все данные принимаются по порядку, но в случае, когда сообщение отбрасывается, я буферизую сообщения, получаю снимок состояния через REST API, а затем воспроизводю буфер, пропуская любые сообщения с идентификатором, предшествующим указанному в снимке. В настоящее время я делаю следующее:

 class StateManager
{
  private long _lastId;
  private bool _isSyncing;
  private object _syncLock;

  private Dictionary<decimal,decimal> _state;  
  private ConcurrentQueue<SocketMessage> _messageBuffer;

  private ManualResetEvent _messageEvent;
  private ManualResetEvent _processingEvent;

  public StateManager( DataSocket socket )
  {
    _isSyncing = false;
    _syncLock = new object();

    _state = new Dictionary<decimal,decimal>();
    _messageBuffer = new ConcurrentQueue<SocketMessage>();

    socket.OnMessage  = OnSocketMessage;
    Task.Factory.StartNew( MessageProcessingThread, TaskCreationOptions.LongRunning );
  }

  public void ApplySnapshot( Snapshot snapshot )
  {
    lock( _syncLock )
    {
      if( _isSyncing ) return;

      _isSyncing = true;
      _processingEvent.Reset();
    }

    // Apply the snapshot to the state...

    _isSyncing = false;
    _processingEvent.Set();
  }

  private void OnSocketMessage( object sender, SocketMessage msg )
  {
    _messageBuffer.Enqueue( msg );
    _messageEvent.Set();
  }

  private async Task MessageProcessingThread()
  {
    while(true)
    {
      _messageEvent.WaitOne();
      while(true)
      {
        _processingEvent.WaitOne();
        if( !_messageBuffer.TryDequeue( out var msg ) )
        {
          _messageEvent.Reset();
          break;
        }
        ApplyToState( msg );
      }

    }
  }
}
  

Это работает нормально, но я чувствую, что это немного неаккуратно и могло бы работать лучше при больших нагрузках. Таким образом, я рассматриваю переход на Microsoft.Tpl.Dataflow , который будет обрабатывать постановку в очередь и выполнение обработки для меня. Однако я уже использовал поток данных раньше, и у меня есть беспокойство:

Есть ли способ приостановить выполнение ActionBlock таким образом, чтобы оно буферизовало новые задачи, но не обрабатывало их, пока я не возобновлю? В случае, когда я обнаруживаю удаленное сообщение, мне нужно иметь возможность приостановить обработку до тех пор, пока не будет применен новый снимок, а затем возобновить и обработать все буферизованные сообщения.

Я мог бы просто использовать _processingEvent внутри ActionBlock , но я чувствую, что это вызвало бы кучу проблем. Во-первых, это заблокировало бы задачу, что привело бы к запуску большего количества задач, и они заблокировались бы, быстро заполнив внутреннюю очередь задач TPL. Кроме того, это привело бы к одновременному завершению всех заблокированных задач, возможно, не по порядку, что привело бы к возникновению другого события повторной синхронизации.

Если это невозможно с помощью TPL, есть ли лучший способ сделать это?

Комментарии:

1. Я бы пошел с задачами. Оберните очередь методом async GetItem, который MessageProcessing может ожидать и контролировать, когда он вернется, например, через TaskCompletionSource. Затем, когда вы дойдете до того, что вам нужно обновить состояние, просто не возвращайте сообщения. Примените новое состояние, отбросьте старые сообщения и начните возвращать новые. async / await довольно хорош с производительностью, и гораздо проще разобраться с этим. Кроме того, не забудьте добавить отмену для полного завершения работы