Запуск скрипта python на 16 процессорах вместо 1 процессора

#python #bash #cpu

Вопрос:

У меня есть скрипт bash, который активирует скрипт на python:

 #!/bin/bash #SBATCH -J XXXXX #SBATCH --ntasks=1 #SBATCH --cpus-per-task=16  python my_python_script.py  

Скрипт python сканирует очень большой файл (~480 000 000 строк) и создает словарь, который позже будет записан в качестве выходного файла:

 with open (huge_file,'r') as hugefile, open (final_file, 'w') as final:  reader= csv.reader (hugefile, delimiter="t")  writer= csv.writer (final, delimiter="t")   d={}    for r in reader:  v=r[0] r[1]  if v not in d.keys():  d[v]=[r[5],r[4]]  else:  d[v].append([r[5],r[4]])   for k,v in d.items():  #analyses   nl = [different variables]  writer.writerow(nl)  

Из-за размера файла я хочу использовать 16 процессоров для запуска, но, несмотря на то, что я определил 16 процессоров в своем сценарии bash, он использует только 1 процессор.

Я много читал о подпроцессе, но, похоже, в данном случае это неприменимо. Я был бы рад услышать любые предложения.

Комментарии:

1. Вы также можете попробовать использовать потоки. Тем не менее, я бы не исключил, что ограничение связано с вводом-выводом, а не с процессором.

2. Многопроцессорная обработка-это оптимальный механизм для распределения вашей работы между несколькими процессорами. Однако вам потребуется разбить входной файл на куски и передать каждый «блок» данных в отдельный подпроцесс для анализа. Затем каким-то образом вам придется координировать ответы/вывод подпроцесса в выходной CSV-файл. Таким образом, вы могли бы использовать многопроцессорную обработку, но если использование одного процесса/потока не является слишком медленным для ваших нужд, я бы сделал это просто.

3. even though I defined 16 CPUs in my bash script Вы задаете некоторый параметр sbatch, который не относится к bash или python. Это часть системы массового обслуживания SLURM и сообщает ей, сколько процессоров может использоваться в этой работе. Но вам нужно реализовать многопроцессорную обработку самостоятельно, как предполагают другие ответы/комментарии.

4. Одновременное чтение нескольких процессов из файла, скорее всего, приведет к замедлению, а не к ускорению. Причина медленной работы заключается в том, что ОС приходится ждать диска. Теперь вы заставляете диск выполнять больше работы, перемещаясь в разные места файла. Гораздо лучший подход, вероятно, заключается в том, чтобы сохранить чтение в одном процессе; возможно, вместо этого подумайте об использовании менее бессмысленного формата хранения, чем CSV. Если вам нужен произвольный доступ к данным, прочитайте их в базе данных.

5. @tripleee какой формат вы бы предложили, кроме CSV?

Ответ №1:

Ядра здесь вам не помогут, так как манипуляция со словарем тривиальна и чрезвычайно быстра.

Здесь у вас проблема ввода-вывода, где чтение и запись файлов является узким местом.

Если вы используете модуль многопроцессорной обработки, вы можете столкнуться с другими проблемами. Создаваемые словари будут независимы друг от друга, поэтому у вас будут дубликаты ключей, каждый с другими данными. Если необходимо сохранить порядок данных CSV, возможно, потому, что это данные временных рядов, вам придется объединить, а затем отсортировать массивы в словаре в качестве дополнительного шага, если вы не учтете эту проблему при объединении 16 словарей. Это также означает, что вы будете разбивать CSV на 16 блоков и обрабатывать их по отдельности на каждом ядре, чтобы вы могли отслеживать порядок.

Вы рассматривали возможность чтения огромного CSV-файла в базу данных SQLite? Это, по крайней мере, даст вам больше контроля над тем, как осуществляется доступ к данным, поскольку 16 процессов могут одновременно получать доступ к данным при указании порядка.

Я действительно сомневаюсь, что здесь есть что-то, что можно было бы распараллелить. Даже если вы используете модуль многопроцессорной обработки, вам необходимо записать весь файл с учетом всего словаря, что ограничивает ваш способ распараллеливания этой задачи.

Ответ №2:

Многопроцессорную обработку трудно применять, потому что все должно быть отсортировано в центральном диктанте. Нескольким процессам последовательно нужно будет знать, какие ключи уже есть в диктанте, и это делает его действительно сложным. Таким образом, более простое решение-попытаться ускорить обработку, оставаясь в рамках одного процесса. понимание диктанта и списка, по-видимому, является хорошим способом продвижения вперед:

 # prepare dict keys and empty list entries: d = {r[0] r[1]: [] for r in reader}  # fill dict [d[r[0] r[1]].append([r[5], r[4]]) for r in reader]  # d is ready for analysis  

Ответ №3:

Вот идея, как использовать несколько процессов (еще не протестировано с большим файлом, но уверен, что он будет работать при отладке).

Шаг 1 состоит в том, чтобы разделить огромный файл на сегменты с помощью функции Linux split :

 bashgt; split -l 10000000 hugefile segment  

Это приведет к созданию файлов, каждый из которых содержит 10 000 000 строк, и имена будут segmentaa, segmentab, .... (см. man страницу split )

Теперь программа Python для чтения этих сегментов файлов запускает по одному процессу на сегмент файла, а затем объединяет результаты в один дикт:

 import multiprocessing as mp import csv   # define process function working on a file segment def proc_target(q, filename):  with open(filename, 'r') as file_segment:  reader = csv.reader(file_segment, delimiter="t")   dd = dict()   def func(r):  key = r[0]   r[1]  if key in dd:  dd[key].append([r[5], r[4]])  else:  dd[key] = [r[5], r[4]]   [func(r) for r in reader]   # send result via queue to main process  q.put(dd)   if __name__ == '__main__':  segment_names = ['segmentaa', 'segmentab', 'segmentac'] # maybe there are more file segments ...  processes = dict() # all objects needed are stored in this dict  mp.set_start_method('spawn')   # launch processes  for fn in segment_names:  q = mp.Queue()  p = mp.Process(target=proc_target, args=(q, fn))  p.start()   processes[fn] = dict()  processes[fn]["process"] = p  processes[fn]["queue"] = q   # read results  for fn in segment_names:  processes[fn]["result"] = processes[fn]["queue"].get()  processes[fn]["process"].join()   # consolidate all results  # start with first segment result and merge the others into it  d = processes[segment_names[0]]["result"]   # helper function for fast execution using list comprehension  def consolidate(key, value):  if key in d:  d[key].append(value)  else:  d[key] = value   # merge other results into d  for fn in segment_names[1:]:  [consolidate(key, value) for key, value in processes[fn]["result"].items()]   # d is ready  

Чтобы избежать узких мест ввода-вывода, было бы разумно распределить сегменты по нескольким дискам и позволить параллельным процессам параллельно получать доступ к различным ресурсам ввода-вывода.

Комментарии:

1. Это вряд ли сработает, потому что каждый файл должен быть последовательно действительным CSV, которым они вряд ли будут, когда будут разбиты на произвольные фрагменты

2. Это предполагает, что на строку приходится одна запись csv, и каждая строка закрывается новой строкой. (что довольно часто встречается). Тогда это сработает, потому что опция разделения есть -l и нет -b

3. Моя ошибка. Я неправильно истолковал флаг -l