предельное количество процессоров, используемых dask compute

#python #dask #dask-distributed

Вопрос:

Приведенный ниже код использует appx 1 сек для выполнения в системе с 8 процессорами. Как вручную настроить количество процессоров, используемых, dask.compute например, до 4 процессоров, чтобы приведенный ниже код использовал appx 2 секунды для выполнения даже в системе с 8 процессорами?

 import dask
from time import sleep

def f(x):
    sleep(1)
    return x**2

objs = [dask.delayed(f)(x) for x in range(8)]
print(dask.compute(*objs))  # (0, 1, 4, 9, 16, 25, 36, 49)
 

Ответ №1:

Есть несколько вариантов:

  1. укажите количество работников на момент создания кластера
 from dask.distributed import Client

# without specifying unique thread, the function is executed
# on all threads
client = Client(n_workers=4, threads_per_worker=1)

# the rest of your code is not changed
 
  1. укажите, сколько (и какие) работников должны выполнять задачу
 
client = Client(n_workers=8, threads_per_worker=1)

list_workers = list(client.scheduler_info()['workers'])

client.compute(objs, workers=list_workers[:4]) 

# submit only to the first 4 workers
# note that workers should still be single-threaded, but the difference
# from option 1 is that you could in principle have more workers
# that are idle, also the `workers` kwarg can be passed to
# dask.compute rather than client.compute
 
  1. укажите семафор
 from dask.distributed import Client, Semaphore

client = Client()
sem = Semaphore(max_leases=4, name="foo")

def fmodified(x, sem):
    with sem:
        return f(x)

objs = [dask.delayed(fmodified)(x, sem) for x in range(8)]
print(dask.compute(*objs))  # (0, 1, 4, 9, 16, 25, 36, 49)
 

Комментарии:

1. Вариант (1) по-прежнему использует все 8 процессоров (для выполнения требуется 1 сек.). Думая, что мне нужно измениться dask.compute(*objs) client.compute(*objs) , но привело к TypeError: Truth of Delayed objects is not supported . Для варианта (2) требовался client объект, который был создан на основе варианта (1), но затем привел к TypeError: compute() got multiple values for argument 'workers' . Опция (3) работает, но не является предпочтительной из-за дополнительной функции. Какие-либо изменения в (1) или (2), чтобы запустить их?

2. Также возникли перебои RuntimeError со всеми тремя вариантами: RuntimeError: An attempt has been made to start a new process before the current process has finished its bootstrapping phase. This probably means that you are not using fork to start your child processes and you have forgotten to use the proper idiom in the main module: if __name__ == '__main__': freeze_support() The "freeze_support()" line can be omitted if the program is not going to be frozen to produce an executable.

3. Хм, извини за это… позвольте мне проверить..

4. Я раньше не замечал однопоточного поведения… так что это интересно. Re: freeze_support сообщения, я никогда не видел, чтобы это происходило… это может быть связано с вашим кодом/настройкой (возможно, вы работаете в Windows).

5. Да, работает в Windows. Никогда не видел этого сообщения в одной и той же системе при всем предыдущем использовании dask только импорта delayed и compute объектов из dask . Это было from dask.distributed import Client, LocalCluster и использование тех объектов, которые периодически приводили к freeze_support сообщениям…