#python #dask #dask-distributed
Вопрос:
Приведенный ниже код использует appx 1 сек для выполнения в системе с 8 процессорами. Как вручную настроить количество процессоров, используемых, dask.compute
например, до 4 процессоров, чтобы приведенный ниже код использовал appx 2 секунды для выполнения даже в системе с 8 процессорами?
import dask
from time import sleep
def f(x):
sleep(1)
return x**2
objs = [dask.delayed(f)(x) for x in range(8)]
print(dask.compute(*objs)) # (0, 1, 4, 9, 16, 25, 36, 49)
Ответ №1:
Есть несколько вариантов:
- укажите количество работников на момент создания кластера
from dask.distributed import Client
# without specifying unique thread, the function is executed
# on all threads
client = Client(n_workers=4, threads_per_worker=1)
# the rest of your code is not changed
- укажите, сколько (и какие) работников должны выполнять задачу
client = Client(n_workers=8, threads_per_worker=1)
list_workers = list(client.scheduler_info()['workers'])
client.compute(objs, workers=list_workers[:4])
# submit only to the first 4 workers
# note that workers should still be single-threaded, but the difference
# from option 1 is that you could in principle have more workers
# that are idle, also the `workers` kwarg can be passed to
# dask.compute rather than client.compute
- укажите семафор
from dask.distributed import Client, Semaphore
client = Client()
sem = Semaphore(max_leases=4, name="foo")
def fmodified(x, sem):
with sem:
return f(x)
objs = [dask.delayed(fmodified)(x, sem) for x in range(8)]
print(dask.compute(*objs)) # (0, 1, 4, 9, 16, 25, 36, 49)
Комментарии:
1. Вариант (1) по-прежнему использует все 8 процессоров (для выполнения требуется 1 сек.). Думая, что мне нужно измениться
dask.compute(*objs)
client.compute(*objs)
, но привело кTypeError: Truth of Delayed objects is not supported
. Для варианта (2) требовалсяclient
объект, который был создан на основе варианта (1), но затем привел кTypeError: compute() got multiple values for argument 'workers'
. Опция (3) работает, но не является предпочтительной из-за дополнительной функции. Какие-либо изменения в (1) или (2), чтобы запустить их?2. Также возникли перебои
RuntimeError
со всеми тремя вариантами:RuntimeError: An attempt has been made to start a new process before the current process has finished its bootstrapping phase. This probably means that you are not using fork to start your child processes and you have forgotten to use the proper idiom in the main module: if __name__ == '__main__': freeze_support() The "freeze_support()" line can be omitted if the program is not going to be frozen to produce an executable.
3. Хм, извини за это… позвольте мне проверить..
4. Я раньше не замечал однопоточного поведения… так что это интересно. Re:
freeze_support
сообщения, я никогда не видел, чтобы это происходило… это может быть связано с вашим кодом/настройкой (возможно, вы работаете в Windows).5. Да, работает в Windows. Никогда не видел этого сообщения в одной и той же системе при всем предыдущем использовании
dask
только импортаdelayed
иcompute
объектов изdask
. Это былоfrom dask.distributed import Client, LocalCluster
и использование тех объектов, которые периодически приводили кfreeze_support
сообщениям…