#c# #task-parallel-library #dataflow
#c# #задача-параллельная-библиотека #поток данных
Вопрос:
Я подписан на поток данных в реальном времени и поддерживаю состояние на основе полученных данных. Обычно все данные принимаются по порядку, но в случае, когда сообщение отбрасывается, я буферизую сообщения, получаю снимок состояния через REST API, а затем воспроизводю буфер, пропуская любые сообщения с идентификатором, предшествующим указанному в снимке. В настоящее время я делаю следующее:
class StateManager
{
private long _lastId;
private bool _isSyncing;
private object _syncLock;
private Dictionary<decimal,decimal> _state;
private ConcurrentQueue<SocketMessage> _messageBuffer;
private ManualResetEvent _messageEvent;
private ManualResetEvent _processingEvent;
public StateManager( DataSocket socket )
{
_isSyncing = false;
_syncLock = new object();
_state = new Dictionary<decimal,decimal>();
_messageBuffer = new ConcurrentQueue<SocketMessage>();
socket.OnMessage = OnSocketMessage;
Task.Factory.StartNew( MessageProcessingThread, TaskCreationOptions.LongRunning );
}
public void ApplySnapshot( Snapshot snapshot )
{
lock( _syncLock )
{
if( _isSyncing ) return;
_isSyncing = true;
_processingEvent.Reset();
}
// Apply the snapshot to the state...
_isSyncing = false;
_processingEvent.Set();
}
private void OnSocketMessage( object sender, SocketMessage msg )
{
_messageBuffer.Enqueue( msg );
_messageEvent.Set();
}
private async Task MessageProcessingThread()
{
while(true)
{
_messageEvent.WaitOne();
while(true)
{
_processingEvent.WaitOne();
if( !_messageBuffer.TryDequeue( out var msg ) )
{
_messageEvent.Reset();
break;
}
ApplyToState( msg );
}
}
}
}
Это работает нормально, но я чувствую, что это немного неаккуратно и могло бы работать лучше при больших нагрузках. Таким образом, я рассматриваю переход на Microsoft.Tpl.Dataflow
, который будет обрабатывать постановку в очередь и выполнение обработки для меня. Однако я уже использовал поток данных раньше, и у меня есть беспокойство:
Есть ли способ приостановить выполнение ActionBlock
таким образом, чтобы оно буферизовало новые задачи, но не обрабатывало их, пока я не возобновлю? В случае, когда я обнаруживаю удаленное сообщение, мне нужно иметь возможность приостановить обработку до тех пор, пока не будет применен новый снимок, а затем возобновить и обработать все буферизованные сообщения.
Я мог бы просто использовать _processingEvent
внутри ActionBlock
, но я чувствую, что это вызвало бы кучу проблем. Во-первых, это заблокировало бы задачу, что привело бы к запуску большего количества задач, и они заблокировались бы, быстро заполнив внутреннюю очередь задач TPL. Кроме того, это привело бы к одновременному завершению всех заблокированных задач, возможно, не по порядку, что привело бы к возникновению другого события повторной синхронизации.
Если это невозможно с помощью TPL, есть ли лучший способ сделать это?
Комментарии:
1. Я бы пошел с задачами. Оберните очередь методом async GetItem, который MessageProcessing может ожидать и контролировать, когда он вернется, например, через TaskCompletionSource. Затем, когда вы дойдете до того, что вам нужно обновить состояние, просто не возвращайте сообщения. Примените новое состояние, отбросьте старые сообщения и начните возвращать новые. async / await довольно хорош с производительностью, и гораздо проще разобраться с этим. Кроме того, не забудьте добавить отмену для полного завершения работы