многопроцессорная обработка imap_unordered в python

#python #multiprocessing

#python #многопроцессорная обработка

Вопрос:

Я создаю программу, которая считывает несколько файлов и записывает краткое описание каждого файла в выходной файл. Размер выходного файла довольно большой, поэтому хранить его в памяти не очень хорошая идея. Я пытаюсь разработать многопроцессорный способ выполнения этого. До сих пор самый простой способ, с которым я смог прийти, это:

 pool = Pool(processes=4)
it = pool.imap_unordered(do, glob.iglob(aglob))
for summary in it:
    writer.writerows(summary)
  

do — это функция, которая суммирует файл. writer — это объект csv.writer

Но правда в том, что я все еще не полностью понимаю multiprocessing.imap. Означает ли это, что 4 сводки вычисляются параллельно и что, когда я читаю одну из них, начинает вычисляться 5-я?

Есть ли лучший способ сделать это?

Спасибо.

Ответ №1:

processes=4 означает, что многопроцессорная обработка запустит пул с четырьмя рабочими процессами и отправит им рабочие элементы. В идеале, если ваша система поддерживает это, т. Е. либо у вас четыре ядра, либо рабочие не полностью привязаны к процессору, 4 рабочих элемента будут обрабатываться параллельно.

Я не знаю реализацию многопроцессорной обработки, но я думаю, что результаты do будут кэшироваться внутренне еще до того, как вы их прочитаете, т. Е. 5-й элемент будет вычислен после завершения любого процесса с элементом из первой волны.

Если есть лучший способ, зависит от типа ваших данных. Сколько всего файлов, которые нуждаются в обработке, насколько велики summary объекты и т.д. Если у вас много файлов (скажем, более 10 кб), их пакетная обработка может быть вариантом, с помощью

 it = pool.imap_unordered(do, glob.iglob(aglob), chunksize=100)
  

Таким образом, рабочий элемент представляет собой не один файл, а 100 файлов, и результаты также сообщаются пакетами по 100. Если у вас много рабочих элементов, разбиение на фрагменты снижает накладные расходы на выделение и отмену выделения результирующих объектов.

Комментарии:

1. «разбиение на фрагменты снижает накладные расходы на извлечение и отмену выделения результирующих объектов». — так почему бы тогда не сделать chunksize = len(iterable) / number of processes ? Каков компромисс?

2. @AdamParkin возможно, процессор не будет обрабатывать каждый элемент с одинаковой скоростью, вы хотите иметь стек «готовых к отправке» элементов, чтобы заполнить ожидающие процессоры.

3. хм, я думаю, тогда у вас должно быть что-то вроде max(3*nproc, len(it)) заданий. что означает chunksize = len(it) / max(3*nproc, len(it)) . или есть хорошая эвристика?