#c# #multithreading #postgresql #plinq #npgsql
#c# #многопоточность #postgresql #plinq #npgsql
Вопрос:
У меня есть следующий оператор PLINQ в программе C #:
foreach (ArrestRecord arrest in
from row in arrestQueue.AsParallel()
select row)
{
Geocoder geocodeThis = new Geocoder(arrest);
writeQueue.Enqueue(geocodeThis.Geocode());
Console.Out.WriteLine("Enqueued " k);
}
Оба arrestQueue
и writeQueue
являются ConcurrentQueues.
Ничего не выполняется параллельно:
- Во время выполнения общая загрузка процессора составляет около 30%, и это при том, что все остальное тоже запущено. У меня 8 ядер (гиперпоточность на Core i7 720QM с 4 физическими ядрами), и 4 из 8 ядер практически не используются вообще. Остальные выполняются примерно на 40-50%.
- Загрузка диска обычно равна 0%, а сеть не используется, за исключением запросов к базе данных Postgres на localhost (см. Ниже).
- Если я добавлю точку останова где-нибудь внутри
geocodeThis.Geocode()
, в выпадающем списке Thread Visual Studio просто будет указано [pid] Main Thread. Он никогда не переходит ни в какой другой поток. - Я использую Npgsql для подключения к Postgres, и каждый поток выполняет несколько запросов SELECT к таблице. Я запускаю приложение состояния сервера pgAdmin III, которое показывает pg_stat_activity. Благодаря мониторингу этого и стратегическому размещению точки останова (см. Выше), я вижу, что приложение никогда не имеет более 1 открытого подключения к базе данных для всех предположительно запущенных одновременно потоков
geocodeThis.Geocode()
. Даже если я добавлю Pooling = false в строку подключения к БД, чтобы принудительно не объединять соединения в пул, я никогда не увижу, что вgeocodeThis.Geocode()
используется более 1 соединения. - Таблица Postgres индексируется по каждому столбцу в предложении WHERE. Даже если бы он был плохо проиндексирован, я бы ожидал большого использования диска. Если бы Postgres поддерживал работу каким-либо другим способом, похоже, это поглотило бы ядро.
Это похоже на простой пример PLINQ, и я ломаю голову над тем, почему ничего не выполняется параллельно.
Ответ №1:
Вы распараллеливаете только перечисление assertQueue
самого IEnumerable
, а затем «непараллеливаете» его обратно в обычное, не имеющее аналогов, значение.,,. Все это происходит еще до того, как foreach
начнется цикл. Затем вы используете обычный IEnumerable
с foreach
, который последовательно запускает тело цикла.
Существует множество способов параллельного выполнения тела цикла, но первый, который приходит на ум, — это использование Parallel.ForEach
:
Parallel.ForEach(arrestQueue, arrest =>
{
Geocoder geocodeThis = new Geocoder(arrest);
writeQueue.Enqueue(geocodeThis.Geocode());
Console.Out.WriteLine("Enqueued " k);
});
Комментарии:
1. Там есть второе перечисление — я тоже пропустил его в первый раз. Но после добавления этого это тоже работает.
2. @Chris:
arrestQueue
имеет типConcurrentQueue<ArrestRecord>
record. Попробуйте скомпилировать свой (отредактированный) код и исходный код poster с использованием фиктивных типов.3. Подтверждаю, вы правы! Считайте это как второе перечисление. Отредактировано соответствующим образом.
4. @Chris: Это перечисление. Это просто перечисление
foreach (var item in input) yield return item;
разновидности. 🙂5. Спасибо!! Это сработало, но я в замешательстве: как мне выполнить параллелизацию в оригинальном PLINQ? Я вижу, вы используете библиотеку параллельных задач вместо PLINQ…
Ответ №2:
Выполнение каждого запроса через параллельную коллекцию по-прежнему является однопоточной операцией. .AsParallel возвращает коллекцию, которая определяет a .Метод ForAll, который может (но по контракту не всегда) выполняться параллельно. Код для этого был бы:
arrestQueue.AsParallel().ForAll(arrest=>
{
Geocoder geocodeThis = new Geocoder(arrest);
writeQueue.Enqueue(geocodeThis.Geocode());
Console.Out.WriteLine("Enqueued " k);
});