Использование нового потока вместо задачи.Запуск при BlockingCollection

#.net #multithreading #async-await

#.net #многопоточность #асинхронный -ожидание

Вопрос:

В нашем коде иногда возникают проблемы с возвратом

У меня есть приложение-производитель / потребитель, которое запускает несколько потребителей, ожидающих BlockingCollection (именованная очередь).

   for (var i = 0; i < 10; i  )
    {

    var t = Task.Factory.StartNew(async () =>
     {
     try
      {
           await InboundQueue(stoppingToken.Value, queueProcessorId).ConfigureAwait(false);
                               
      }
      catch (OperationCanceledException ocex)
      {
                                
      }
      catch (Exception ex)
      {
                                
      }
      }, TaskCreationOptions.LongRunning | TaskCreationOptions.DenyChildAttach);
 

Каждый потребитель блокируется в методе TryTake перед обработкой ProcessMessage.

 if (!_queue.TryTake(out var message, timeToWait, cancellationToken))
{
..
await processMessage((T)message).ConfigureAwait(false);
..
}
 

В ProcessMessage мы несколько раз вызываем базу данных, выполняющую async / await, вызывая httpclient с помощью async / await и даже выполняя в некоторых случаях Task.Запускайте, чтобы что-то делать, а не ждать завершения.

Бывает довольно обидно, что для завершения await требуется некоторое время. Мы добавили в некоторый код внутри ProcessMessage дополнительный ConfigureAwait(false), и это помогло в некоторых случаях. По-прежнему происходит медленное ожидание.

https://github.com/davidfowl/AspNetCoreDiagnosticScenarios/blob/master/AsyncGuidance.md#avoid-using-taskrun-for-long-running-work-that-blocks-the-thread Читая этот пост, мы изменим код, чтобы использовать «новый поток» вместо «Задача.Factory.StartNew».

1. Поскольку у нас будет стандартный поток, созданный для запуска потребителей, нужен ли нам внутри метода processMesssage дополнительный ConfigureAwait(false), вызывающий асинхронный httpclient или database?

2.ProcessMessage имеет собственный поток и может быть легко синхронизирован. Можем ли мы безопасно выполнить .Result для библиотек, которые запрашивают асинхронный вызов?

3. Как мы можем отладить эту ситуацию? Примечание: Отслеживая сеть, мы обнаружили, что вызовы https и вызовы базы данных всегда выполняются быстро.

Заранее спасибо.

Комментарии:

1. Соберите трассировку ETW с помощью PerfView и проанализируйте ее, чтобы увидеть, где время тратится впустую.

2. @DarkoZa Инструмент .NET Async для профилирования производительности и визуализатор параллелизма могут стать вашими лучшими друзьями.

3. Мы хотели бы проанализировать производственную среду, и у нас там нет установок Visual studio.

Ответ №1:

В исходном коде есть проблема: StartNew не понимает async код. В частности, LongRunning флаг там в значительной степени бессмыслен, потому что он применяется только к части кода, пока не попадет в первую await .

У меня есть приложение-производитель / потребитель, которое запускает несколько потребителей, ожидающих BlockingCollection (именованная очередь).

Каждый потребитель блокируется в методе TryTake перед обработкой ProcessMessage.

Таким образом, вы в конечном итоге блокируете кучу потоков пула потоков, и я подозреваю, что именно это на самом деле вызывает вашу проблему (особенно, если этот шаблон повторяется в другом месте).

Бывает довольно обидно, что для завершения await требуется некоторое время.

Это звучит как исчерпание пула потоков, что может произойти, если потоки пула потоков используются для блокировки на регулярной основе.

Мы добавили в некоторый код внутри ProcessMessage дополнительный ConfigureAwait(false), и это помогло в некоторых случаях.

Сомнительно. На потоки пула потоков ConfigureAwait(false) это не влияет.

https://github.com/davidfowl/AspNetCoreDiagnosticScenarios/blob/master/AsyncGuidance.md#avoid-using-taskrun-for-long-running-work-that-blocks-the-thread Читая этот пост, мы изменим код, чтобы использовать «новый поток» вместо «Задача.Factory.StartNew».

Что ж, важно понимать контекст и почему исходный код плохой. (Примечание сбоку: Дэвид указывает: «Не используйте TaskCreationOptions.LongRunning с async кодом, так как это создаст новый поток, который будет уничтожен после первого await «, что является причиной моего первого абзаца выше).

Проблема в том, что потоки пула потоков регулярно блокируются. Если при запуске это всего лишь пара потоков, я лично подумал бы, что все в порядке, и просто разрешил пулу потоков вводить замены. Но если код регулярно блокирует потоки пула потоков, это проблема и может привести к исчерпанию пула потоков.

Теперь, что касается решений, одним из решений действительно является использование ручных потоков, но это не то решение, которое я бы рекомендовал. IMO гораздо лучшее решение — удалить блокировку.

В частности, замените BlockingCollection<T> очередь на что async -то вроде дружественного System.Threading.Channels . Или, если вас интересует несколько больший рефакторинг, загляните в поток данных TPL.

Удаление блокировки — гораздо лучшее решение, чем добавление ручных потоков IMO. Тем не менее, чтобы ответить на ваши конкретные вопросы:

Поскольку у нас будет стандартный поток, созданный для запуска потребителей, нужен ли нам внутри метода processMesssage дополнительный ConfigureAwait(false), вызывающий асинхронный httpclient или database?

Для ручного потока — точно так же, как поток пула потоков — ConfigureAwait(false) бессмысленно.

ProcessMessage имеет собственный поток и может быть легко синхронизирован.

На самом деле это должно быть синхронно. Пользовательские потоки не могут быть асинхронными (поток завершается первым await ). Если вы не установите однопоточный контекст, но тогда ConfigureAwait(false) это имеет смысл. И это становится довольно сложным.

Можем ли мы безопасно выполнить .Result для библиотек, которые запрашивают асинхронный вызов?

Это зависит от библиотеки. Но они, вероятно, были бы в порядке. Примечание: GetAwaiter().GetResult() похоже Result , за исключением того, что он не переносит исключения в AggregateException .

Как мы можем отладить эту ситуацию?

Это хороший вопрос. Эта статья может помочь; по сути, провести анализ производительности и искать регулярные события внедрения потоков. Затем вы можете найти часто блокирующий API, сделав снимок и изучив стеки потоков.

Комментарии:

1. Вы предлагаете заменить задачу. Factory.StartNew с Task.Run?

2. Если вам нужно поставить работу в очередь в поток пула потоков, тогда да.

3. Нам нужно, чтобы транзакции выполнялись параллельно.

4. Я считаю, что вам нужен параллелизм ; Я не вижу необходимости в параллелизме. Вы можете получить асинхронный параллелизм, просто используя async методы (с Task.WhenAll помощью, если вам нужно объединить результаты).