#python-3.x #multithreading #parallel-processing #nested #joblib
Вопрос:
При использовании вложенных параллельных операторов с n_jobs>1 во внешнем операторе вложенная параллельная функция, по-видимому, ограничена ресурсами, доступными из 1 потока вместо 4. Т. е. рассмотрите следующий сильно упрощенный сценарий для воспроизведения проблемы:
from joblib import Parallel, delayed
import numpy as np
def parallel_in_parallel_test(i):
a = np.ones((1000,1000))
for j in range(2000):
a *= np.random.randn(1000,1000)
return a.sum()
def parallel_in_parallel_wrapper(j, n_threads=4):
out2 = Parallel(n_jobs=n_threads)(delayed(parallel_in_parallel_test)(i) for i in range(100))
return np.array(out2).sum()
out = Parallel(n_jobs=3)(delayed(parallel_in_parallel_wrapper)(j, n_threads=4) for j in range(100))
Я бы ожидал, что 3 параллельных процесса производят 4 процесса, которые обеспечивают загрузку процессора на 400%, т. Е. 4 ядра/потока на «родительский процесс». Однако вместо этого эти 4 «дочерних процесса» выполняются не параллельно. Каждый «родительский процесс» использует 100% процессора вместо 400% (только одно ядро/поток).
Рассмотрим следующий снимок экрана с htop:
Я могу воспроизвести проблему в минималистичной среде:
conda create --name py39 python=3.9
conda activate py39
conda install numpy joblib
Моя ОС: Ubuntu 18.04.5 LTS
Не Вложенный подход неосуществим с моим конкретным кодом, поскольку вложенная параллель находится в методе класса.
Есть какие-либо предложения, как правильно использовать доступные ресурсы? Это просто простая ошибка?
Ответ №1:
По — видимому, изменение предела inner_max_num_threads серверной части, используемой во вложенном операторе, работает-мне все еще непонятно, почему это необходимо в первую очередь.
from joblib import Parallel, delayed, parallel_backend
import numpy as np
def parallel_in_parallel_test(i):
a = np.ones((1000,1000))
for j in range(2000):
a *= np.random.randn(1000,1000)
return a.sum()
def parallel_in_parallel_wrapper(j, n_threads=4):
with parallel_backend("loky", inner_max_num_threads=n_threads):
out2 = Parallel(n_jobs=n_threads)(delayed(parallel_in_parallel_test)(i) for i in range(100))
return np.array(out2).sum()
out = Parallel(n_jobs=3)(delayed(parallel_in_parallel_wrapper)(j, n_threads=4) for j in range(100))