PLINQ в ConcurrentQueue не является многопоточным

#c# #multithreading #postgresql #plinq #npgsql

#c# #многопоточность #postgresql #plinq #npgsql

Вопрос:

У меня есть следующий оператор PLINQ в программе C #:

  foreach (ArrestRecord arrest in
            from row in arrestQueue.AsParallel()
            select row)
        {
            Geocoder geocodeThis = new Geocoder(arrest);
            writeQueue.Enqueue(geocodeThis.Geocode());
            Console.Out.WriteLine("Enqueued "     k);
        }
  

Оба arrestQueue и writeQueue являются ConcurrentQueues.

Ничего не выполняется параллельно:

  • Во время выполнения общая загрузка процессора составляет около 30%, и это при том, что все остальное тоже запущено. У меня 8 ядер (гиперпоточность на Core i7 720QM с 4 физическими ядрами), и 4 из 8 ядер практически не используются вообще. Остальные выполняются примерно на 40-50%.
  • Загрузка диска обычно равна 0%, а сеть не используется, за исключением запросов к базе данных Postgres на localhost (см. Ниже).
  • Если я добавлю точку останова где-нибудь внутри geocodeThis.Geocode() , в выпадающем списке Thread Visual Studio просто будет указано [pid] Main Thread. Он никогда не переходит ни в какой другой поток.
  • Я использую Npgsql для подключения к Postgres, и каждый поток выполняет несколько запросов SELECT к таблице. Я запускаю приложение состояния сервера pgAdmin III, которое показывает pg_stat_activity. Благодаря мониторингу этого и стратегическому размещению точки останова (см. Выше), я вижу, что приложение никогда не имеет более 1 открытого подключения к базе данных для всех предположительно запущенных одновременно потоков geocodeThis.Geocode() . Даже если я добавлю Pooling = false в строку подключения к БД, чтобы принудительно не объединять соединения в пул, я никогда не увижу, что в geocodeThis.Geocode() используется более 1 соединения.
  • Таблица Postgres индексируется по каждому столбцу в предложении WHERE. Даже если бы он был плохо проиндексирован, я бы ожидал большого использования диска. Если бы Postgres поддерживал работу каким-либо другим способом, похоже, это поглотило бы ядро.

Это похоже на простой пример PLINQ, и я ломаю голову над тем, почему ничего не выполняется параллельно.

Ответ №1:

Вы распараллеливаете только перечисление assertQueue самого IEnumerable , а затем «непараллеливаете» его обратно в обычное, не имеющее аналогов, значение.,,. Все это происходит еще до того, как foreach начнется цикл. Затем вы используете обычный IEnumerable с foreach , который последовательно запускает тело цикла.

Существует множество способов параллельного выполнения тела цикла, но первый, который приходит на ум, — это использование Parallel.ForEach :

 Parallel.ForEach(arrestQueue, arrest =>
    {
        Geocoder geocodeThis = new Geocoder(arrest);
        writeQueue.Enqueue(geocodeThis.Geocode());
        Console.Out.WriteLine("Enqueued "     k);
    });
  

Комментарии:

1. Там есть второе перечисление — я тоже пропустил его в первый раз. Но после добавления этого это тоже работает.

2. @Chris: arrestQueue имеет тип ConcurrentQueue<ArrestRecord> record. Попробуйте скомпилировать свой (отредактированный) код и исходный код poster с использованием фиктивных типов.

3. Подтверждаю, вы правы! Считайте это как второе перечисление. Отредактировано соответствующим образом.

4. @Chris: Это перечисление. Это просто перечисление foreach (var item in input) yield return item; разновидности. 🙂

5. Спасибо!! Это сработало, но я в замешательстве: как мне выполнить параллелизацию в оригинальном PLINQ? Я вижу, вы используете библиотеку параллельных задач вместо PLINQ…

Ответ №2:

Выполнение каждого запроса через параллельную коллекцию по-прежнему является однопоточной операцией. .AsParallel возвращает коллекцию, которая определяет a .Метод ForAll, который может (но по контракту не всегда) выполняться параллельно. Код для этого был бы:

 arrestQueue.AsParallel().ForAll(arrest=>
    {
        Geocoder geocodeThis = new Geocoder(arrest);
        writeQueue.Enqueue(geocodeThis.Geocode());
        Console.Out.WriteLine("Enqueued "     k);
    });