Параллельно.Каждому не удается выполнить сообщения при длительном запуске IEnumerable

#c# #.net #asynchronous #task-parallel-library

#c# #.net #асинхронный #задача-параллельная-библиотека

Вопрос:

Почему параллельный.ForEach не завершит выполнение серии задач, пока MoveNext не вернет false?

У меня есть инструмент, который отслеживает комбинацию очередей MSMQ и Service Broker для входящих сообщений. Когда сообщение найдено, оно передает это сообщение соответствующему исполнителю.

Я завернул проверку сообщений в IEnumerable, чтобы я мог передать параллель.Для каждого метода IEnumerable плюс делегат для запуска. Приложение предназначено для непрерывной работы с IEnumerator.Перемещайте следующую обработку в цикле, пока она не сможет приступить к работе, затем IEnumerator.Текущий, дающий ему следующий элемент.

Поскольку MoveNext никогда не умрет, пока я не установлю для CancelToken значение true, это должно продолжаться вечно. Вместо этого я вижу, что когда-то параллельный.ForEach получил все сообщения, и MoveNext больше не возвращает «true», больше задачи не обрабатываются. Вместо этого кажется, что поток MoveNext является единственным потоком, которому выполняется какая-либо работа, пока он ожидает ее возврата, а другие потоки (включая ожидающие и запланированные потоки) не выполняют никакой работы.

  • Есть ли способ сообщить параллели, чтобы она продолжала работать, пока она ожидает ответа от MoveNext?
  • Если нет, есть ли другой способ структурировать MoveNext, чтобы получить то, что я хочу? (если оно возвращает true, а затем текущий объект, возвращающий нулевой объект, порождает множество фиктивных задач)
  • Дополнительный вопрос: Есть ли способ ограничить количество сообщений, которые параллель отправляет одновременно? Кажется, что он выполняет и планирует множество сообщений одновременно (MaxDegreeOfParallelism, похоже, ограничивает только объем работы, которую он выполняет одновременно, это не мешает ему выполнять множество сообщений, которые должны быть запланированы)

Вот IEnumerator для того, что я написал (без некоторого постороннего кода):

 public class DataAccessEnumerator : IEnumerator<TransportMessage> 
{
    public TransportMessage Current
    {   get { return _currentMessage; } }

    public bool MoveNext()
    {
        while (_cancelToken.IsCancellationRequested == false)
        {
            TransportMessage current;
            foreach (var task in _tasks)
            {
                if (task.QueueType.ToUpper() == "MSMQ")
                    current = _msmq.Get(task.Name);
                else
                    current = _serviceBroker.Get(task.Name);

                if (current != null)
                {
                    _currentMessage = current;
                    return true;
                }
            }
            WaitHandle.WaitAny(new [] {_cancelToken.WaitHandle}, 500); 
        }

        return false; 
    }

    public DataAccessEnumerator(IDataAccess<TransportMessage> serviceBroker, IDataAccess<TransportMessage> msmq, IList<JobTask> tasks, CancellationToken cancelToken)
    {
        _serviceBroker = serviceBroker;
        _msmq = msmq;
        _tasks = tasks;
        _cancelToken = cancelToken;
    }

    private readonly IDataAccess<TransportMessage> _serviceBroker;
    private readonly IDataAccess<TransportMessage> _msmq;
    private readonly IList<JobTask> _tasks;
    private readonly CancellationToken _cancelToken;
    private TransportMessage _currentMessage;
}
  

Вот параллель.Вызов ForEach, где _queueAccess — это IEnumerable, который содержит указанный выше IEnumerator, и runJob обрабатывает TransportMessage, возвращаемое из этого IEnumerator:

 var parallelOptions = new ParallelOptions
    {
        CancellationToken = _cancelTokenSource.Token,
        MaxDegreeOfParallelism = 8 
    };

Parallel.ForEach(_queueAccess, parallelOptions, x => RunJob(x));
  

Комментарии:

1. Возможно, реактивные расширения могли бы помочь? msdn.microsoft.com/en-us/data/gg577609

Ответ №1:

Мне кажется, что Parallel.ForEach это не совсем подходит для того, что вы хотите сделать. Я предлагаю вам использовать BlockingCollection<T> для создания очереди производителя / потребителя вместо этого — создайте кучу потоков / задач для обслуживания коллекции блокировки и добавляйте в нее рабочие элементы по мере их поступления.

Ответ №2:

Ваша проблема может быть связана с используемым разделителем.

В вашем случае TPL выберет разделитель блоков, который будет принимать несколько элементов из перечисления, прежде чем передавать их для обработки. Количество элементов, взятых в каждом блоке, будет увеличиваться со временем.

Когда ваш MoveNext метод блокируется, TPL остается в ожидании следующего элемента и не обрабатывает элементы, которые он уже принял.

У вас есть несколько вариантов, чтобы исправить это:

1) Напишите разделитель, который всегда возвращает отдельные элементы. Не так сложно, как кажется.

2) Используйте TPL вместо Parallel.ForEach :

 foreach ( var item in _queueAccess )
{
    var capturedItem = item;

    Task.Factory.StartNew( () => RunJob( capturedItem ) );
}
  

Второе решение немного меняет поведение. foreach Цикл завершится, когда все Tasks будут созданы, а не когда они завершатся. Если это проблема для вас, вы можете добавить CountdownEvent :

 var ce = new CountdownEvent( 1 );

foreach ( var item in _queueAccess )
{
    ce.AddCount();

    var capturedItem = item;

    Task.Factory.StartNew( () => { RunJob( capturedItem ); ce.Signal(); } );
}

ce.Signal();
ce.Wait();
  

Ответ №3:

Я не прилагал усилий, чтобы убедиться в этом, но впечатление, которое я получил от обсуждений Parallel.ForEach заключался в том, что он извлек бы все элементы из перечисляемых, чтобы они принимали соответствующие решения о том, как разделить их по потокам. Исходя из вашей проблемы, это кажется правильным.

Итак, чтобы сохранить большую часть вашего текущего кода, вам, вероятно, следует извлечь блокирующий код из итератора и поместить его в цикл вокруг вызова Parallel.ForEach (который использует итератор).