#c# #.net #asynchronous #task-parallel-library
#c# #.net #асинхронный #задача-параллельная-библиотека
Вопрос:
Почему параллельный.ForEach не завершит выполнение серии задач, пока MoveNext не вернет false?
У меня есть инструмент, который отслеживает комбинацию очередей MSMQ и Service Broker для входящих сообщений. Когда сообщение найдено, оно передает это сообщение соответствующему исполнителю.
Я завернул проверку сообщений в IEnumerable, чтобы я мог передать параллель.Для каждого метода IEnumerable плюс делегат для запуска. Приложение предназначено для непрерывной работы с IEnumerator.Перемещайте следующую обработку в цикле, пока она не сможет приступить к работе, затем IEnumerator.Текущий, дающий ему следующий элемент.
Поскольку MoveNext никогда не умрет, пока я не установлю для CancelToken значение true, это должно продолжаться вечно. Вместо этого я вижу, что когда-то параллельный.ForEach получил все сообщения, и MoveNext больше не возвращает «true», больше задачи не обрабатываются. Вместо этого кажется, что поток MoveNext является единственным потоком, которому выполняется какая-либо работа, пока он ожидает ее возврата, а другие потоки (включая ожидающие и запланированные потоки) не выполняют никакой работы.
- Есть ли способ сообщить параллели, чтобы она продолжала работать, пока она ожидает ответа от MoveNext?
- Если нет, есть ли другой способ структурировать MoveNext, чтобы получить то, что я хочу? (если оно возвращает true, а затем текущий объект, возвращающий нулевой объект, порождает множество фиктивных задач)
- Дополнительный вопрос: Есть ли способ ограничить количество сообщений, которые параллель отправляет одновременно? Кажется, что он выполняет и планирует множество сообщений одновременно (MaxDegreeOfParallelism, похоже, ограничивает только объем работы, которую он выполняет одновременно, это не мешает ему выполнять множество сообщений, которые должны быть запланированы)
Вот IEnumerator для того, что я написал (без некоторого постороннего кода):
public class DataAccessEnumerator : IEnumerator<TransportMessage>
{
public TransportMessage Current
{ get { return _currentMessage; } }
public bool MoveNext()
{
while (_cancelToken.IsCancellationRequested == false)
{
TransportMessage current;
foreach (var task in _tasks)
{
if (task.QueueType.ToUpper() == "MSMQ")
current = _msmq.Get(task.Name);
else
current = _serviceBroker.Get(task.Name);
if (current != null)
{
_currentMessage = current;
return true;
}
}
WaitHandle.WaitAny(new [] {_cancelToken.WaitHandle}, 500);
}
return false;
}
public DataAccessEnumerator(IDataAccess<TransportMessage> serviceBroker, IDataAccess<TransportMessage> msmq, IList<JobTask> tasks, CancellationToken cancelToken)
{
_serviceBroker = serviceBroker;
_msmq = msmq;
_tasks = tasks;
_cancelToken = cancelToken;
}
private readonly IDataAccess<TransportMessage> _serviceBroker;
private readonly IDataAccess<TransportMessage> _msmq;
private readonly IList<JobTask> _tasks;
private readonly CancellationToken _cancelToken;
private TransportMessage _currentMessage;
}
Вот параллель.Вызов ForEach, где _queueAccess — это IEnumerable, который содержит указанный выше IEnumerator, и runJob обрабатывает TransportMessage, возвращаемое из этого IEnumerator:
var parallelOptions = new ParallelOptions
{
CancellationToken = _cancelTokenSource.Token,
MaxDegreeOfParallelism = 8
};
Parallel.ForEach(_queueAccess, parallelOptions, x => RunJob(x));
Комментарии:
1. Возможно, реактивные расширения могли бы помочь? msdn.microsoft.com/en-us/data/gg577609
Ответ №1:
Мне кажется, что Parallel.ForEach
это не совсем подходит для того, что вы хотите сделать. Я предлагаю вам использовать BlockingCollection<T>
для создания очереди производителя / потребителя вместо этого — создайте кучу потоков / задач для обслуживания коллекции блокировки и добавляйте в нее рабочие элементы по мере их поступления.
Ответ №2:
Ваша проблема может быть связана с используемым разделителем.
В вашем случае TPL выберет разделитель блоков, который будет принимать несколько элементов из перечисления, прежде чем передавать их для обработки. Количество элементов, взятых в каждом блоке, будет увеличиваться со временем.
Когда ваш MoveNext
метод блокируется, TPL остается в ожидании следующего элемента и не обрабатывает элементы, которые он уже принял.
У вас есть несколько вариантов, чтобы исправить это:
1) Напишите разделитель, который всегда возвращает отдельные элементы. Не так сложно, как кажется.
2) Используйте TPL вместо Parallel.ForEach
:
foreach ( var item in _queueAccess )
{
var capturedItem = item;
Task.Factory.StartNew( () => RunJob( capturedItem ) );
}
Второе решение немного меняет поведение. foreach
Цикл завершится, когда все Tasks
будут созданы, а не когда они завершатся. Если это проблема для вас, вы можете добавить CountdownEvent
:
var ce = new CountdownEvent( 1 );
foreach ( var item in _queueAccess )
{
ce.AddCount();
var capturedItem = item;
Task.Factory.StartNew( () => { RunJob( capturedItem ); ce.Signal(); } );
}
ce.Signal();
ce.Wait();
Ответ №3:
Я не прилагал усилий, чтобы убедиться в этом, но впечатление, которое я получил от обсуждений Parallel.ForEach заключался в том, что он извлек бы все элементы из перечисляемых, чтобы они принимали соответствующие решения о том, как разделить их по потокам. Исходя из вашей проблемы, это кажется правильным.
Итак, чтобы сохранить большую часть вашего текущего кода, вам, вероятно, следует извлечь блокирующий код из итератора и поместить его в цикл вокруг вызова Parallel.ForEach (который использует итератор).