#python #numpy #multiprocessing
Вопрос:
Я должен ускорить свой текущий код, чтобы выполнить около 10^6 операций за приемлемое время. Прежде чем я использовал многопроцессорную обработку в своем фактическом документе, я попытался сделать это в фиктивном случае. Ниже приводится моя попытка:
def chunkIt(seq, num):
avg = len(seq) / float(num)
out = []
last = 0.0
while last < len(seq):
out.append(seq[int(last):int(last avg)])
last = avg
return out
def do_something(List):
# in real case this function takes about 0.5 seconds to finish for each
iteration
turn = []
for e in List:
turn.append((e[0]**2, e[1]**2,e[2]**2))
return turn
t1 = time.time()
List = []
#in the real case these 20's can go as high as 150
for i in range(1,20-2):
for k in range(i 1,20-1):
for j in range(k 1,20):
List.append((i,k,j))
t3 = time.time()
test = []
List = chunkIt(List,3)
if __name__ == '__main__':
with concurrent.futures.ProcessPoolExecutor() as executor:
results = executor.map(do_something,List)
for result in results:
test.append(result)
test= np.array(test)
t2 = time.time()
T = t2-t1
T2 = t3-t1
Однако, когда я увеличиваю размер своего «Списка», мой компьютер устает использовать всю мою оперативную память и процессор и зависает. Я даже разрезал свой «Список» на 3 части, так что в нем будут использоваться только 3 моих ядра. Однако ничего не изменилось. Кроме того, когда я попытался использовать его на меньшем наборе данных, я заметил, что код работает намного медленнее, чем когда он работал на одном ядре.
Я все еще очень новичок в многопроцессорной обработке в Python, я делаю что-то не так. Я был бы признателен, если бы вы могли мне помочь.
Комментарии:
1. Каждый подпроцесс будет создавать основную
List
переменную, а затем разбивать ее на части, что, вероятно, не поможет. Поместите весь этот установочный код вif __name__ == "__main__":
блок2. И не используйте список имен переменных, это зарезервированное имя в Python
3.
when I increase the size of my "List" my computer tires to use all of my RAM and CPU and freezes
. Насколько велик ваш «список»? Использование 150x150x150 дает список из примерно 3 миллионов предметов, и это составляет около 40 МБ. Интересно, используете ли вы гораздо больший набор данных?4. @JawadAhmadKhan Есть ли вероятность, что вы сделали это неправильно. Аргумент to
imap
должен быть генератором или выражением генератора.5. @JawadAhmadKhan Я думаю, что у меня будет возможность позже сегодня взглянуть на это.
Ответ №1:
Чтобы уменьшить использование памяти, я предлагаю вам вместо этого использовать multiprocessing
модуль и, в частности, метод imap (или метод imap_unordered). В отличие от map
метода или multiprocessing.Pool
или concurrent.futures.ProcessPoolExecutor
, итерационный аргумент обрабатывается лениво. Это означает, что если вы используете функцию генератора или выражение генератора для повторяющегося аргумента, вам не нужно создавать полный список аргументов в памяти; когда процессор в пуле освободится и будет готов выполнять больше задач, генератор будет вызван для генерации следующего аргумента для imap
вызова.
Следующие тесты выполняют ту же обработку сначала как отдельный процесс, а затем с использованием многопроцессорной imap
обработки, где каждый вызов do_something
на моем рабочем столе занимает примерно 0,5 секунды. do_something
теперь он был изменен, чтобы просто обрабатывать один кортеж i, k, j, так как больше нет необходимости разбивать что-либо на более мелкие списки:
from multiprocessing import Pool, cpu_count
import time
def half_second():
HALF_SECOND_ITERATIONS = 10_000_000
sum = 0
for _ in range(HALF_SECOND_ITERATIONS):
sum = 1
return sum
def do_something(tpl):
# in real case this function takes about 0.5 seconds to finish for each iteration
half_second() # on my desktop
return tpl[0]**2, tpl[1]**2, tpl[2]**2
"""
def generate_tpls():
for i in range(1, 20-2):
for k in range(i 1, 20-1):
for j in range(k 1, 20):
yield i, k, j
"""
# Use smaller number of tuples so we finish in a reasonable amount of time:
def generate_tpls():
# 64 tuples:
for i in range(1, 5):
for k in range(1, 5):
for j in range(1, 5):
yield i, k, j
def benchmark1():
""" single processing """
t = time.time()
for tpl in generate_tpls():
result = do_something(tpl)
print('benchmark1 time:', time.time() - t)
def compute_chunksize(iterable_size, pool_size):
""" This is more-or-less the function used by the Pool.map method """
chunksize, remainder = divmod(iterable_size, 4 * pool_size)
if remainder:
chunksize = 1
return chunksize
def benchmark2():
""" multiprocssing """
t = time.time()
pool_size = cpu_count() # 8 logical cores (4 physical cores)
N_TUPLES = 64 # number of tuples that will be generated
pool = Pool(pool_size)
chunksize = compute_chunksize(N_TUPLES, pool_size)
for result in pool.imap(do_something, generate_tpls(), chunksize=chunksize):
pass
print('benchmark2 time:', time.time() - t)
if __name__ == '__main__':
benchmark1()
benchmark2()
С принтами:
benchmark1 time: 32.261038303375244
benchmark2 time: 8.174998044967651
Комментарии:
1. Спасибо вам за ответ. Однако, когда я увеличиваю количество кортежей примерно до 100, та же проблема все еще существует. Компьютер просто задыхается и зависает. В то время как benchmark1 может, по крайней мере, завершить выполнение кода.
2. Во-первых, я обновил код, добавив переменную N_TUPLES,
benchmark2
чтобы подчеркнуть, что вычислениеchunksize
зависит от количества кортежей, которые вы в конечном итоге создадите. Во-вторых, я думаю, вам нужно будет обновить свой вопрос с вашим фактическимdo_something(tpl)
определением функции, чтобы мы могли видеть, что происходит.
Ответ №2:
Проблема заключается в вложенных циклах For, создающих массив перед основным определением. Перемещение этой части под основным определением устраняет любые проблемы с памятью.
def chunkIt(seq, num):
avg = len(seq) / float(num)
out = []
last = 0.0
while last < len(seq):
out.append(seq[int(last):int(last avg)])
last = avg
return out
def do_something(List):
# in real case this function takes about 0.5 seconds to finish for each
iteration
turn = []
for e in List:
turn.append((e[0]**2, e[1]**2,e[2]**2))
return turn
if __name__ == '__main__':
t1 = time.time()
List = []
#in the real case these 20's can go as high as 150
for i in range(1,20-2):
for k in range(i 1,20-1):
for j in range(k 1,20):
List.append((i,k,j))
t3 = time.time()
test = []
List = chunkIt(List,3)
with concurrent.futures.ProcessPoolExecutor() as executor:
results = executor.map(do_something,List)
for result in results:
test.append(result)
test= np.array(test)
t2 = time.time()
T = t2-t1
T2 = t3-t1