#python-3.x #multiprocessing #concurrent.futures
Вопрос:
Я использую следующий код для обработки некоторых изображений для моего проекта ML, и я хотел бы распараллелить его.
import multiprocessing as mp import concurrent.futures def track_ids(seq): '''The func is so big I can not put it here''' ood = {} for i in seq: # I load around 500 images and process them ood[i] = some Value return ood seqs = [] for seq in range(1, 10):# len(seqs) 1): seq = txt str(seq) seqs.append(seq) # serial call of the function track_ids(seq) #parallel call of the function with concurrent.futures.ProcessPoolExecutor(max_workers=mp.cpu_count()) as ex: ood_id = ex.map(track_ids, seqs)
если я запускаю код последовательно, это занимает 3.0
несколько минут, но для параллельного с параллельным это занимает 3.5
несколько минут. может кто-нибудь, пожалуйста, объяснить, почему это так? и представить способ решения проблемы.
кстати, у меня 12 ядер. Спасибо
Комментарии:
1. трудно сказать… очевидно, что в структуре кода нет ничего плохого. Каждый раз, когда слова «скорость» слетают с ваших уст, вы должны быть в курсе. По общему признанию, профилирование многопроцессорного кода сложнее, чем один процесс, но существует множество инструментов.
Ответ №1:
Вот краткий пример того, как можно было бы профилировать многопроцессорный код по сравнению с последовательным выполнением:
from multiprocessing import Pool from cProfile import Profile from pstats import Stats import concurrent.futures def track_ids(seq): '''The func is so big I can not put it here''' ood = {} for i in seq: # I load around 500 images and process them ood[i] = some Value return ood def profile_seq(): p = Profile() #one and only profiler instance p.enable() seqs = [] for seq in range(1, 10):# len(seqs) 1): seq = txt str(seq) seqs.append(seq) # serial call of the function track_ids(seq) p.disable() return Stats(p), seqs def track_ids_pr(seq): p = Profile() #profile the child tasks p.enable() retval = track_ids(seq) p.disable() return (Stats(p, stream="dummy"), retval) def profile_parallel(): p = Profile() #profile stuff in the main process p.enable() with concurrent.futures.ProcessPoolExecutor(max_workers=mp.cpu_count()) as ex: retvals = ex.map(track_ids_pr, seqs) p.disable() s = Stats(p) out = [] for ret in retvals: s.add(ret[0]) out.append(ret[1]) return s, out if __name__ == "__main__": stat, retval = profile_parallel() stat.print_stats()
РЕДАКТИРОВАТЬ: К сожалению, я обнаружил, что pstat.Stats
объекты нельзя использовать нормально, multiprocessing.Queue
потому что они не поддаются маринованию (что необходимо для работы concurrent.futures
). Очевидно, что обычно он хранит ссылку на файл с целью записи статистики в этот файл, и если таковой не указан, по умолчанию он захватит ссылку на sys.stdout
. Однако на самом деле нам эта ссылка не нужна, пока мы действительно не захотим распечатать статистику, поэтому мы можем просто присвоить ей временное значение, чтобы предотвратить ошибку рассола, а затем восстановить соответствующее значение позже. Следующий пример должен быть с возможностью копирования-вставки и работать просто отлично, а не в приведенном выше примере псевдокодирования.
from multiprocessing import Queue, Process from cProfile import Profile from pstats import Stats import sys def isprime(x): for d in range(2, int(x**.5)): if x % d == 0: return False return True def foo(retq): p = Profile() p.enable() primes = [] max_n = 2**20 for n in range(3, max_n): if isprime(n): primes.append(n) p.disable() retq.put(Stats(p, stream="dummy")) #Dirty hack: set `stream` to something picklable then override later if __name__ == "__main__": q = Queue() p1 = Process(target=foo, args=(q,)) p1.start() p2 = Process(target=foo, args=(q,)) p2.start() s1 = q.get() s1.stream = sys.stdout #restore original file s2 = q.get() # s2.stream #if we are just adding this `Stats` object to another the `stream` just gets thrown away anyway. s1.add(s2) #add up the stats from both child processes. s1.print_stats() #s1.stream gets used here, but not before. If you provide a file to write to instead of sys.stdout, it will write to that file) p1.join() p2.join()
Комментарии:
1. Целью профилирования было бы, как мы надеемся, определить, где в вашем коде находится узкое место. Для меня вполне вероятно, что вы, возможно, уже исчерпали, например, жесткий диск, на котором хранятся все изображения, и в этом случае вы просто увеличиваете накладные расходы в и без того ограниченной системе. Однако профилирование поможет вам точно выяснить, где происходит задержка
2.Я получаю следующую ошибку при профилировании, которая исходит из
for ret in retvals: etc.
concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.
3. @Dariyouh Мой плохой за то, что не проверял.. К сожалению
pstat.Stats
, обычно не поддается отбору, который необходим для отправки результатов из дочернего процесса. Типичным решением (насколько я могу понять из документации по профилированию) было бы записать статистику в файл из дочернего процесса, а затем повторно открыть эти файлы, чтобы загрузить их в основной процесс… в качестве альтернативы мы можем просто предоставитьStats
конструктору поддельный файловый поток, который можно выбрать, потому что на самом деле ему это все равно не нужно.