Многопроцессорная обработка Python для обработки файлов

#python #python-multiprocessing

Вопрос:

Я никогда раньше ничего не делал с многопроцессорной обработкой, но недавно столкнулся с проблемой, когда один из моих проектов требовал слишком много времени для запуска. У меня есть около 336 000 файлов, которые мне нужно обработать, и традиционный цикл for, скорее всего, займет около недели.

Для этого есть два цикла, но они фактически идентичны в том, что они возвращают, поэтому я включил только один.

 import json
import os
from tqdm import tqdm
import multiprocessing as mp

jsons = os.listdir('/content/drive/My Drive/mrp_workflow/JSONs')

materials = [None] * len(jsons)

def asyncJSONs(file, index):
  try:
    with open('/content/drive/My Drive/mrp_workflow/JSONs/{}'.format(file)) as f:
        data = json.loads(f.read())
    properties = process_dict(data, {})
    properties['name'] = file.split('.')[0]
    materials[index] = properties
  except:
    print("Error parsing at {}".format(file))

process_list = []
i = 0
for file in tqdm(jsons):
    p = mp.Process(target=asyncJSONs,args=(file,i))
    p.start()
    process_list.append(p)
    i  = 1

for process in process_list:
  process.join()
 

Все, что касается многопроцессорной обработки, было собрано из набора поисковых запросов и статей Google, поэтому я не удивлюсь, если это будет неверно. Например, переменная » i » — это грязная попытка сохранить информацию в каком-то порядке.

То, что я пытаюсь сделать, — это загрузить информацию из этих файлов JSON и сохранить ее в переменной materials. Но когда я запускаю свой текущий код, в материалах ничего не хранится.

Комментарии:

1. multiprocessing.Pool и для этого потребуется materials = pool(asyncJSONs, jsons)

Ответ №1:

Как вы можете прочитать в других ответах — процессы не используют общую память, и вы не можете установить значение напрямую materials . Функция должна использовать return , чтобы отправить результат обратно в основной процесс, и она должна дождаться результата и получить его.

С ним может быть проще Pool . Его не нужно использовать queue вручную. И он должен возвращать результаты в том же порядке, что и данные all_jsons . И вы можете установить, сколько процессов должно выполняться одновременно, чтобы он не блокировал процессор для других процессов в системе.

Но он не может использовать tqdm .

Я не мог проверить это, но это может быть что-то вроде этого

 import os
import json
from multiprocessing import Pool

# --- functions ---

def asyncJSONs(filename):
  try:
    fullpath = os.path.join(folder, filename)
    with open(fullpath) as f:
        data = json.loads(f.read())
    properties = process_dict(data, {})
    properties['name'] = filename.split('.')[0]
    return properties
  except:
    print("Error parsing at {}".format(filename))

# --- main ---

# for all processes (on some systems it may have to be outside `__main__`)
folder = '/content/drive/My Drive/mrp_workflow/JSONs'

if __name__ == '__main__':
    # code only for main process
    
    all_jsons = os.listdir(folder)

    with Pool(5) as p:
        materials = p.map(asyncJSONs, all_jsons)

    for item in materials:
        print(item)    
 

кстати:

Другие модули: concurrent.futures, joblib, ray,

Ответ №2:

Я упомяну совершенно другой способ решения этой проблемы. Не утруждайте себя попытками добавить все данные в один и тот же список. Извлеките необходимые данные и добавьте их в какой-либо целевой файл в формате ndjson/jsonlines. Вот только вместо объектов, входящих в массив json [{},{}...] , в каждой строке у вас есть отдельные объекты.

 {"foo": "bar"} 
{"foo": "spam"} 
{"eggs": "jam"} 
 

Рабочий процесс выглядит следующим образом:

  1. создайте N рабочих с манифестом файлов для обработки и выходным файлом для записи. Вам даже не нужен MP, вы можете использовать такой инструмент, как rush, для распараллеливания.
  2. работник анализирует данные, генерирует выходной дикт
  3. работник открывает выходной файл с флагом добавления. сбросьте данные и немедленно сбросьте их:
 with open(out_file, 'a') as fp: 
  print(json.dumps(data), file=fp, flush=True) 
 

Пожалуйста, убедитесь, что до тех пор, пока размер ваших данных меньше размера буфера в вашем ядре (обычно несколько МБ), ваши различные процессы не будут давить друг на друга и конфликтовать при записи. Если они действительно конфликтуют, вам, возможно, потребуется записать отдельный выходной файл для каждого сотрудника, а затем присоединиться ко всем.

Вы можете присоединиться к файлам и/или преобразовать их в обычный массив JSON, если это необходимо, с помощью jq. Честно говоря, просто примите jsonlines. Это гораздо лучший формат данных для длинных списков объектов, так как вам не нужно анализировать все это в памяти.

Комментарии:

1. При вызове операционной системы для открытия (а также сброса и закрытия) файла может потребоваться значительное количество накладных расходов 336 000 раз…

2. @martineau, вероятно, не так много, как вы думаете. Ваша запись данных, вероятно, все еще будет доминировать. Это то, что я получил за добавление 1024 байт на SSD на i9 Мбит / с: time per loop: best=51.321 µs, mean=60.314 ± 11.3 µs

Ответ №3:

Вам нужно понять, как работает многопроцессорная обработка. Он запускает совершенно новый процесс для КАЖДОЙ задачи, каждый с совершенно новым интерпретатором Python, который запускает ваш сценарий заново. Эти процессы никоим образом не разделяют общую память. Другие процессы получают КОПИЮ ваших глобалов, но они, очевидно, не могут быть той же памятью.

Если вам нужно отправить информацию обратно, вы можете использовать multiprocessing.queue о. Попросите функцию поместить результаты в очередь, в то время как ваш основной код ожидает, пока они волшебным образом появятся в очереди.

Также, пожалуйста, прочитайте инструкции в multiprocessing документах о main. Каждый новый процесс будет повторно выполнять весь код в вашем основном файле. Таким образом, любой одноразовый материал абсолютно должен содержаться в

 if __name__ == "__main__":
 

блок. Это один из случаев, когда практика ввода вашего основного кода в вызываемую функцию main() является «лучшей практикой».


Что здесь происходит все это время? Он читает файлы? Если это так, то вы могли бы сделать это с помощью многопоточности вместо многопроцессорной обработки. Однако, если вы ограничены скоростью диска, то никакая многопроцессорная обработка не сократит время выполнения.

Комментарии:

1. Формальности. Ввод вашего кода основной линии в if блок, безусловно, требуется.

2. Многие из присутствующих здесь-новички, так что это имеет значение, ИМО.

3. Честно говоря, я бы предпочел, чтобы они поняли, что main() требуется. Те, кто знает лучше, будут знать лучше.