Joblib вложенное параллельное выполнение без использования доступных ядер

#python-3.x #multithreading #parallel-processing #nested #joblib

Вопрос:

При использовании вложенных параллельных операторов с n_jobs>1 во внешнем операторе вложенная параллельная функция, по-видимому, ограничена ресурсами, доступными из 1 потока вместо 4. Т. е. рассмотрите следующий сильно упрощенный сценарий для воспроизведения проблемы:

 from joblib import Parallel, delayed
import numpy as np

def parallel_in_parallel_test(i):
    a = np.ones((1000,1000))
    for j in range(2000):
        a *= np.random.randn(1000,1000)
    return a.sum()

def parallel_in_parallel_wrapper(j, n_threads=4):
    out2 = Parallel(n_jobs=n_threads)(delayed(parallel_in_parallel_test)(i) for i in range(100))
    return np.array(out2).sum()

out = Parallel(n_jobs=3)(delayed(parallel_in_parallel_wrapper)(j, n_threads=4) for j in range(100))
 

Я бы ожидал, что 3 параллельных процесса производят 4 процесса, которые обеспечивают загрузку процессора на 400%, т. Е. 4 ядра/потока на «родительский процесс». Однако вместо этого эти 4 «дочерних процесса» выполняются не параллельно. Каждый «родительский процесс» использует 100% процессора вместо 400% (только одно ядро/поток).

Рассмотрим следующий снимок экрана с htop: снимок экрана htop, показывающий низкое количество активных потоков и большое количество неактивных потоков

Я могу воспроизвести проблему в минималистичной среде:

 conda create --name py39 python=3.9
conda activate py39
conda install numpy joblib
 

Моя ОС: Ubuntu 18.04.5 LTS

Не Вложенный подход неосуществим с моим конкретным кодом, поскольку вложенная параллель находится в методе класса.

Есть какие-либо предложения, как правильно использовать доступные ресурсы? Это просто простая ошибка?

Ответ №1:

По — видимому, изменение предела inner_max_num_threads серверной части, используемой во вложенном операторе, работает-мне все еще непонятно, почему это необходимо в первую очередь.

 from joblib import Parallel, delayed, parallel_backend
import numpy as np

def parallel_in_parallel_test(i):
    a = np.ones((1000,1000))
    for j in range(2000):
        a *= np.random.randn(1000,1000)
    return a.sum()

def parallel_in_parallel_wrapper(j, n_threads=4):
    with parallel_backend("loky", inner_max_num_threads=n_threads):
        out2 = Parallel(n_jobs=n_threads)(delayed(parallel_in_parallel_test)(i) for i in range(100))
    return np.array(out2).sum()

out = Parallel(n_jobs=3)(delayed(parallel_in_parallel_wrapper)(j, n_threads=4) for j in range(100))