Как я могу распараллелить следующий фрагмент кода на python?

#python #numpy #parallel-processing #multiprocessing #dask

#python #numpy #параллельная обработка #многопроцессорная обработка #dask

Вопрос:

У меня есть куча операций умножения матриц, которые выполняются только по строкам. Мне было интересно, как ускорить вычисления путем распараллеливания:

 data = np.random.randint(1, 100, (100000, 800))
indices_1 = np.equal(data, 1)
A = np.zeros((100000, 100))
B = np.random.randn(800, 100)

for i in range(100000):
   ones = indices_1[i]
   not_ones = ~indices_1[i]
   B_ones = B[ones]
   B_not_ones = B[not_ones]
   A[i] = (data[i][not_ones] @ B_not_ones) @ np.linalg.inv(B_not_ones.T @ B_not_ones)  
   data[i][ones] = A[i] @ B_ones.T
    
 

Я пробовал многопроцессорный, но по какой-то причине он работал не лучше, чем последовательный. Вот моя многопроцессорная реализация:

 from multiprocessing.pool import ThreadPool, Pool
pool = ThreadPool() # can also use Pool

def f(i):
   ones = indices_1[i]
   not_ones = ~indices_1[i]
   B_ones = B[ones]
   B_not_ones = B[not_ones]
   A[i] = (data[i][not_ones] @ B_not_ones) @ np.linalg.inv(B_not_ones.T @ B_not_ones)  
   data[i][ones] = A[i] @ B_ones.T


pool.map(f, range(100000))
 

Оба дали одинаковое время выполнения (около 32 секунд). Другой метод распараллеливания, например concurrent.futures , не улучшил время выполнения (используется, как показано ниже):

 with concurrent.futures.ThreadPoolExecutor() as executor:
     result = executor.map(f, range(100000))
 

Я также пытался применить dask , но не смог заставить их фреймворк работать в моем случае. Любая помощь будет высоко оценена! Спасибо!

Комментарии:

1. Многопроцессорная обработка должна копировать ваши данные между процессами. Он не подходит для обработки одного большого фрагмента данных.

2. подходит ли вам графический процессор? пробовал ваш код с версией GPU в Google colab?

3. Я бы полностью удалил цикл for и просто позволил numpy обрабатывать матричные операции.

4. @ZeelBharatkumarPatel1931006 Я только что попробовал использовать GPU в Google colab, время выполнения для обоих сократилось до 28, но многопроцессор не улучшил время выполнения.

5. вы должны использовать многопоточный модуль, потому что при многопроцессорной обработке каждый рабочий процесс получает свою собственную память, и вы не получите желаемого результата, вы можете использовать cocurrent.futures . ThreadPoolExecutor

Ответ №1:

 import numpy as np
import multiprocessing as mp


data = list(np.random.randint(1, 100, (100000, 800)))
indices_1 = np.equal(data, 1)
A = list(np.zeros((100000, 100)))
B = np.random.randn(800, 100)


def f(data, A, i):
    ones = indices_1[i]
    not_ones = ~indices_1[i]
    B_ones = B[ones]
    B_not_ones = B[not_ones]
    A[i] = (data[i][not_ones] @ B_not_ones) @ np.linalg.inv(B_not_ones.T @ B_not_ones)
    data[i][ones] = A[i] @ B_ones.T

with mp.Manager() as manager:
    data_global = manager.list(data)
    A_global = manager.list(A)

    with mp.Pool() as p:
        results = [ p.apply_async(f, (data_global, A_global, i,)) for i in range(100000) ]
        for i in results:
            i.wait()

    data_global = list(data_global)
    A_global = list(A_global)
 

Комментарии:

1. Спасибо за предоставленный код, но по-прежнему безуспешно:( Я попытался изменить max_workers с 2 на cpu_count() * 4 , но все они имеют более высокое время выполнения, чем последовательный аналог.

2. Хорошо, позвольте мне запустить ваш код локально, какой @ символ в коде?

3. Это эквивалентно np.matmul() (я прочитал это в документации NumPy)

4. Многопоточность в Python ограничена через Gil. Хотя это может обеспечить повышение производительности, это не эквивалентно немедленному параллельному выполнению n раз.

5. Я обновил код, я получил примерно 6-7-кратное улучшение скорости с 4-ядерным процессором, не могли бы вы проверить, что результат такой же, как последовательный