#c# #.net #task-parallel-library
#c# #.net #задача-параллельная-библиотека
Вопрос:
В моей таблице около 3 миллионов строк. У меня есть консольное приложение для получения всех строк и обработки этих строк. Я хочу использовать TPL для одновременной выборки 1000 строк и выполнения моей логики обработки. У меня может быть следующая логика: внутри метода ProcessRowsForPage я получу записи на основе номера страницы.
int totalRecordsCount = GetCount();
int pagecount = totalRecordsCount/1000;
for (int j= 0; j <= pagecount; j )
{
var pageNo= j;
var t = Task.Factory.StartNew(() =>
{
ProcessRowsForPage(pageNo);
});
tasks.Add(t);
}
Может быть, это странно, но есть ли способ создавать задачи без общего количества. Я хочу использовать что-то вроде цикла do while и прекратить создание задач, когда больше не нужно извлекать строки
Комментарии:
1. Как задача узнает, что она достигла последней страницы, если нет подсчета? Он должен был бы задать предыдущую задачу, но предыдущая задача, возможно, еще не была завершена.
2. Да, верно. Я думаю, что, если я не знаю общего количества, я думаю, что невозможно создать несколько задач.
3. Потенциально вы могли бы использовать ConcurrentQueue и удаление из очереди 1000 одновременно.
Ответ №1:
Для такого рода ситуаций вам лучше TPL Dataflow
.
Для этого вам понадобятся следующие компоненты:
- a
SqlDataReader
или что-то другое, что может передавать данные из базы данных - a
BatchBlock
сBatchSize = 1000
ActionBlock
это вызоветProcessRows
метод
Теперь, чтобы создать конвейер обработки, свяжите блоки вместе:
batchBlock.LinkTo(actionBlock, new DataflowLinkOptions { PropagateCompletion = true });
После этого из ваших dataReader
Post
строк в BatchBlock
:
while(reader.Read())
{
var item = ConvertRow(reader);
batchBlock.Post(item);
}
// When you get here you've read all the data from the database
// tell the pipeline that no more data is coming
batchBlock.Complete();
И это позаботится об обработке. Если вы хотите получать уведомления, когда конвейер завершит обработку всех элементов, используйте Completion
свойство ActionBlock
to получать уведомления.
actionBlock.Completion.ContinueWith(prev => {Console.WriteLine("Finished.");}).
Ответ №2:
Вы могли бы сделать это, если вместо того, чтобы создавать потенциально миллионы задач, что является плохой идеей, если вы используете какой-то пул.
Создайте 3 (например) задачи в массиве и запустите их все.
Когда одна задача завершится, если строк будет больше, запустите ее снова.
Как только задача не возвращает больше данных, прекратите ее запуск, дождитесь завершения всех задач, и все готово.
Пример:
TASK1 > GetNext100Rows(0)
TASK2 > GetNext100Rows(100)
TASK3 > GetNext100Rows(200)
Если задача 2 завершится первой, перезапустите ее:
TASK1 > GetNext100Rows(0) [Processing]
TASK2 > GetNext100Rows(300) [Processing]
TASK3 > GetNext100Rows(200) [Processing]
Продолжайте перезапускать все завершенные задачи и каждый раз увеличивать его на 100.
Наконец, когда задача не возвращает больше данных, дождитесь завершения всех оставшихся потоков.
Для этого требуется, чтобы ваша задача могла возвращать или указывать, что у нее больше нет данных, например, путем установки переменной флага или в возвращаемом объекте.