Нельзя использовать многопроцессорную обработку Python с большим объемом вычислений

#python #numpy #multiprocessing

Вопрос:

Я должен ускорить свой текущий код, чтобы выполнить около 10^6 операций за приемлемое время. Прежде чем я использовал многопроцессорную обработку в своем фактическом документе, я попытался сделать это в фиктивном случае. Ниже приводится моя попытка:

 def chunkIt(seq, num):
    avg = len(seq) / float(num)
    out = []
    last = 0.0

    while last < len(seq):
        out.append(seq[int(last):int(last   avg)])
        last  = avg

    return out
 
def do_something(List):
    # in real case this function takes about 0.5 seconds to finish for each 
    iteration
    turn = []
    for e in List:
        turn.append((e[0]**2, e[1]**2,e[2]**2))
    return turn
    
t1 = time.time()

List = []
#in the real case these 20's can go as high as 150
for i in range(1,20-2):
    for k in range(i 1,20-1):
        for j in range(k 1,20):
            List.append((i,k,j))
            
t3 = time.time() 
test = []    

List = chunkIt(List,3)

if __name__ == '__main__':
    with concurrent.futures.ProcessPoolExecutor() as executor:
        results = executor.map(do_something,List)
        for result in results:
            test.append(result)       
    
test= np.array(test)
t2 = time.time()

T = t2-t1
T2 = t3-t1
 

Однако, когда я увеличиваю размер своего «Списка», мой компьютер устает использовать всю мою оперативную память и процессор и зависает. Я даже разрезал свой «Список» на 3 части, так что в нем будут использоваться только 3 моих ядра. Однако ничего не изменилось. Кроме того, когда я попытался использовать его на меньшем наборе данных, я заметил, что код работает намного медленнее, чем когда он работал на одном ядре.
Я все еще очень новичок в многопроцессорной обработке в Python, я делаю что-то не так. Я был бы признателен, если бы вы могли мне помочь.

Комментарии:

1. Каждый подпроцесс будет создавать основную List переменную, а затем разбивать ее на части, что, вероятно, не поможет. Поместите весь этот установочный код в if __name__ == "__main__": блок

2. И не используйте список имен переменных, это зарезервированное имя в Python

3. when I increase the size of my "List" my computer tires to use all of my RAM and CPU and freezes . Насколько велик ваш «список»? Использование 150x150x150 дает список из примерно 3 миллионов предметов, и это составляет около 40 МБ. Интересно, используете ли вы гораздо больший набор данных?

4. @JawadAhmadKhan Есть ли вероятность, что вы сделали это неправильно. Аргумент to imap должен быть генератором или выражением генератора.

5. @JawadAhmadKhan Я думаю, что у меня будет возможность позже сегодня взглянуть на это.

Ответ №1:

Чтобы уменьшить использование памяти, я предлагаю вам вместо этого использовать multiprocessing модуль и, в частности, метод imap (или метод imap_unordered). В отличие от map метода или multiprocessing.Pool или concurrent.futures.ProcessPoolExecutor , итерационный аргумент обрабатывается лениво. Это означает, что если вы используете функцию генератора или выражение генератора для повторяющегося аргумента, вам не нужно создавать полный список аргументов в памяти; когда процессор в пуле освободится и будет готов выполнять больше задач, генератор будет вызван для генерации следующего аргумента для imap вызова.

По умолчанию используется значение размера фрагмента 1, что может быть неэффективным для большого размера с возможностью повторения. При использовании map и значении по умолчанию None для аргумента chunksize пул сначала посмотрит длину list итерируемого, при необходимости преобразуя его в a, а затем вычислит, какой размер он считает эффективным, исходя из этой длины и размера пула. При использовании imap или imap_unordered преобразование итерируемого в a list приведет к нарушению всей цели использования этого метода. Но если вы знаете, каким был бы этот размер (более или менее), если бы итерационный были преобразованы в список, то нет причин не применять тот же расчет map размера кусков, который был бы у метода, и это то, что делается ниже.

Следующие тесты выполняют ту же обработку сначала как отдельный процесс, а затем с использованием многопроцессорной imap обработки, где каждый вызов do_something на моем рабочем столе занимает примерно 0,5 секунды. do_something теперь он был изменен, чтобы просто обрабатывать один кортеж i, k, j, так как больше нет необходимости разбивать что-либо на более мелкие списки:

 from multiprocessing import Pool, cpu_count
import time

def half_second():
    HALF_SECOND_ITERATIONS = 10_000_000
    sum = 0
    for _ in range(HALF_SECOND_ITERATIONS):
        sum  = 1
    return sum

def do_something(tpl):
    # in real case this function takes about 0.5 seconds to finish for each  iteration
    half_second() # on my desktop
    return tpl[0]**2, tpl[1]**2, tpl[2]**2

"""
def generate_tpls():
    for i in range(1, 20-2):
        for k in range(i 1, 20-1):
            for j in range(k 1, 20):
                yield i, k, j
"""

# Use smaller number of tuples so we finish in a reasonable amount of time:
def generate_tpls():
    # 64 tuples:
    for i in range(1, 5):
        for k in range(1, 5):
            for j in range(1, 5):
                yield i, k, j

def benchmark1():
    """ single processing """
    t = time.time()
    for tpl in generate_tpls():
        result = do_something(tpl)
    print('benchmark1 time:', time.time() - t)

def compute_chunksize(iterable_size, pool_size):
    """ This is more-or-less the function used by the Pool.map method """
    chunksize, remainder = divmod(iterable_size, 4 * pool_size)
    if remainder:
        chunksize  = 1
    return chunksize

def benchmark2():
    """ multiprocssing """
    t = time.time()
    pool_size = cpu_count() # 8 logical cores (4 physical cores)
    N_TUPLES = 64 # number of tuples that will be generated
    pool = Pool(pool_size)
    chunksize = compute_chunksize(N_TUPLES, pool_size)
    for result in pool.imap(do_something, generate_tpls(), chunksize=chunksize):
        pass
    print('benchmark2 time:', time.time() - t)


if __name__ == '__main__':
    benchmark1()
    benchmark2()
 

С принтами:

 benchmark1 time: 32.261038303375244
benchmark2 time: 8.174998044967651
 

Комментарии:

1. Спасибо вам за ответ. Однако, когда я увеличиваю количество кортежей примерно до 100, та же проблема все еще существует. Компьютер просто задыхается и зависает. В то время как benchmark1 может, по крайней мере, завершить выполнение кода.

2. Во-первых, я обновил код, добавив переменную N_TUPLES, benchmark2 чтобы подчеркнуть, что вычисление chunksize зависит от количества кортежей, которые вы в конечном итоге создадите. Во-вторых, я думаю, вам нужно будет обновить свой вопрос с вашим фактическим do_something(tpl) определением функции, чтобы мы могли видеть, что происходит.

Ответ №2:

Проблема заключается в вложенных циклах For, создающих массив перед основным определением. Перемещение этой части под основным определением устраняет любые проблемы с памятью.

 def chunkIt(seq, num):
    avg = len(seq) / float(num)
    out = []
    last = 0.0

    while last < len(seq):
        out.append(seq[int(last):int(last   avg)])
        last  = avg

    return out
 
def do_something(List):
    # in real case this function takes about 0.5 seconds to finish for each 
    iteration
    turn = []
    for e in List:
        turn.append((e[0]**2, e[1]**2,e[2]**2))
    return turn
    

if __name__ == '__main__':
   t1 = time.time()

   List = []
   #in the real case these 20's can go as high as 150
   for i in range(1,20-2):
     for k in range(i 1,20-1):
        for j in range(k 1,20):
            List.append((i,k,j))
            
   t3 = time.time() 
   test = []    

   List = chunkIt(List,3)
   with concurrent.futures.ProcessPoolExecutor() as executor:
        results = executor.map(do_something,List)
        for result in results:
            test.append(result)       
    
   test= np.array(test)
   t2 = time.time()

   T = t2-t1
   T2 = t3-t1