Почему последовательный код в этом случае быстрее, чем параллельный.фьючерсы?

#python-3.x #multiprocessing #concurrent.futures

Вопрос:

Я использую следующий код для обработки некоторых изображений для моего проекта ML, и я хотел бы распараллелить его.

 import multiprocessing as mp import concurrent.futures  def track_ids(seq):  '''The func is so big I can not put it here'''  ood = {}  for i in seq:  # I load around 500 images and process them  ood[i] = some Value  return ood  seqs = [] for seq in range(1, 10):# len(seqs) 1):  seq = txt str(seq)  seqs.append(seq)  # serial call of the function  track_ids(seq)  #parallel call of the function with concurrent.futures.ProcessPoolExecutor(max_workers=mp.cpu_count()) as ex:  ood_id = ex.map(track_ids, seqs)  

если я запускаю код последовательно, это занимает 3.0 несколько минут, но для параллельного с параллельным это занимает 3.5 несколько минут. может кто-нибудь, пожалуйста, объяснить, почему это так? и представить способ решения проблемы.

кстати, у меня 12 ядер. Спасибо

Комментарии:

1. трудно сказать… очевидно, что в структуре кода нет ничего плохого. Каждый раз, когда слова «скорость» слетают с ваших уст, вы должны быть в курсе. По общему признанию, профилирование многопроцессорного кода сложнее, чем один процесс, но существует множество инструментов.

Ответ №1:

Вот краткий пример того, как можно было бы профилировать многопроцессорный код по сравнению с последовательным выполнением:

 from multiprocessing import Pool from cProfile import Profile from pstats import Stats import concurrent.futures  def track_ids(seq):  '''The func is so big I can not put it here'''  ood = {}  for i in seq:  # I load around 500 images and process them  ood[i] = some Value  return ood  def profile_seq():  p = Profile() #one and only profiler instance  p.enable()  seqs = []  for seq in range(1, 10):# len(seqs) 1):  seq = txt str(seq)  seqs.append(seq)  # serial call of the function  track_ids(seq)  p.disable()  return Stats(p), seqs   def track_ids_pr(seq):  p = Profile() #profile the child tasks  p.enable()    retval = track_ids(seq)    p.disable()  return (Stats(p, stream="dummy"), retval)   def profile_parallel():  p = Profile() #profile stuff in the main process  p.enable()    with concurrent.futures.ProcessPoolExecutor(max_workers=mp.cpu_count()) as ex:  retvals = ex.map(track_ids_pr, seqs)    p.disable()  s = Stats(p)    out = []  for ret in retvals:  s.add(ret[0])  out.append(ret[1])    return s, out   if __name__ == "__main__":  stat, retval = profile_parallel()  stat.print_stats()  

РЕДАКТИРОВАТЬ: К сожалению, я обнаружил, что pstat.Stats объекты нельзя использовать нормально, multiprocessing.Queue потому что они не поддаются маринованию (что необходимо для работы concurrent.futures ). Очевидно, что обычно он хранит ссылку на файл с целью записи статистики в этот файл, и если таковой не указан, по умолчанию он захватит ссылку на sys.stdout . Однако на самом деле нам эта ссылка не нужна, пока мы действительно не захотим распечатать статистику, поэтому мы можем просто присвоить ей временное значение, чтобы предотвратить ошибку рассола, а затем восстановить соответствующее значение позже. Следующий пример должен быть с возможностью копирования-вставки и работать просто отлично, а не в приведенном выше примере псевдокодирования.

 from multiprocessing import Queue, Process from cProfile import Profile from pstats import Stats import sys  def isprime(x):  for d in range(2, int(x**.5)):  if x % d == 0:  return False  return True  def foo(retq):  p = Profile()  p.enable()    primes = []  max_n = 2**20  for n in range(3, max_n):  if isprime(n):  primes.append(n)    p.disable()  retq.put(Stats(p, stream="dummy")) #Dirty hack: set `stream` to something picklable then override later  if __name__ == "__main__":  q = Queue()    p1 = Process(target=foo, args=(q,))  p1.start()    p2 = Process(target=foo, args=(q,))  p2.start()    s1 = q.get()  s1.stream = sys.stdout #restore original file  s2 = q.get()  # s2.stream #if we are just adding this `Stats` object to another the `stream` just gets thrown away anyway.    s1.add(s2) #add up the stats from both child processes.  s1.print_stats() #s1.stream gets used here, but not before. If you provide a file to write to instead of sys.stdout, it will write to that file)    p1.join()  p2.join()  

Комментарии:

1. Целью профилирования было бы, как мы надеемся, определить, где в вашем коде находится узкое место. Для меня вполне вероятно, что вы, возможно, уже исчерпали, например, жесткий диск, на котором хранятся все изображения, и в этом случае вы просто увеличиваете накладные расходы в и без того ограниченной системе. Однако профилирование поможет вам точно выяснить, где происходит задержка

2.Я получаю следующую ошибку при профилировании, которая исходит из for ret in retvals: etc. concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.

3. @Dariyouh Мой плохой за то, что не проверял.. К сожалению pstat.Stats , обычно не поддается отбору, который необходим для отправки результатов из дочернего процесса. Типичным решением (насколько я могу понять из документации по профилированию) было бы записать статистику в файл из дочернего процесса, а затем повторно открыть эти файлы, чтобы загрузить их в основной процесс… в качестве альтернативы мы можем просто предоставить Stats конструктору поддельный файловый поток, который можно выбрать, потому что на самом деле ему это все равно не нужно.