Выстраивание рабочих в очередь в Даске

#python #dask #dask-distributed #dask-delayed

Вопрос:

У меня есть следующий сценарий, который мне нужно решить с помощью планировщика Dask и рабочих:

  • Программа Dask имеет N функций, вызываемых в цикле (N определяется пользователем)
  • Каждая функция запускается delayed(func)(args) для параллельного выполнения.
  • Когда запускается каждая функция из предыдущей точки, она запускает W рабочих. Вот как я призываю работников:
     futures = client.map(worker_func, worker_args)     
    worker_responses = client.gather(futures)
     

Это означает, что мне нужны N * W работников, чтобы запускать все параллельно. Проблема в том, что это не оптимально, так как это слишком большое распределение ресурсов, я запускаю его в облаке, и это дорого. Кроме того, N определяется пользователем, поэтому я заранее не знаю, какими возможностями обработки мне нужно обладать.

Есть ли способ поставить работников в очередь таким образом, чтобы, если я определю, что в Dask есть X работников, когда один работник заканчивается, начинается следующий?

Комментарии:

1. Я думаю, что у вас неправильное представление о том, как ведет себя клиент, что и является источником вашей проблемы! в частности, когда вы ссылаетесь на работников и работаете параллельно, вы, вероятно, имеете в виду одно и то же, поскольку клиент ведет себя как параллельный исполнитель.futures , а также обрабатывает результирующие фьючерсы из вашего .map() приложения

2. Допустим, у меня есть 100 доступных работников, один планировщик и приложение, которое вызывает планировщик. Это приложение вызывает 5 функций, каждая из которых имеет delayed . Каждая из этих функций вызывает 50 работников (в общей сложности приложение должно запускать 250 работников). Поскольку 250 больше 100, мне нужно знать, как Dask distributed обработает запрос на запуск рабочего номера 101. Будет ли Даск ждать, пока один из 100 занятых работников не закончит, чтобы позвонить следующему работнику?

3. Я бы не стал создавать дополнительных работников и просто позволил планировщику планировать работу с доступными — да, Dask будет планировать работу среди доступных работников, когда задач больше, чем рабочих

4. Я нахожу, что вы путаете «рабочий» (процесс, запущенный на машине, которая может выполнять задачи) и «задача» (вызов функции с аргументами). Планировщик распределяет задачи и не запускает рабочих. Менеджер кластера может запускать рабочих, но я не думаю, что вы используете это. Отправка 250 задач 100 работникам-это совершенно нормально, планировщик будет запускать новые задачи по мере того, как работники станут свободными.

5. @mdurant Теперь я понимаю, спасибо за объяснение

Ответ №1:

Сначала определите необходимое вам количество работников, относитесь к ним как к эфемерным, но статичным на протяжении всего процесса обработки
Вы можете создавать их динамически (при запуске или позже), но, вероятно, захотите, чтобы все они были готовы в самом начале обработки

С вашей точки зрения, клиент является исполнителем (поэтому, когда вы ссылаетесь на работников и работаете параллельно, вы, вероятно, имеете в виду одно и то же

Этот класс напоминает исполнителей, concurrent.futures но также допускает Future объекты в submit/map вызовах. Когда создается экземпляр клиента, он берет на себя все dask.compute и dask.persist вызывает по умолчанию.

Как только ваши работники будут доступны, Dask распределит работу, предоставленную им через планировщик

Вы должны выполнять любые задачи, которые зависят друг от друга, передавая результат dask.delayed() с помощью предшествующей функции result (которая является будущим, а еще не результатом)
. Это будущее в качестве аргументов позволит Dask построить график задач вашей работы

Пример использования https://examples.dask.org/delayed.html
Ссылка на будущее https://docs.dask.org/en/latest/futures.html#distributed.Будущее

Зависимое будущее с даском.отложено

Вот полный пример из отложенных документов (на самом деле объединяет несколько последовательных примеров с одним и тем же результатом)

 import dask
from dask.distributed import Client

client = Client(...)  # connect to distributed cluster

def inc(x):
    return x   1

def double(x):
    return x * 2

def add(x, y):
    return x   y

data = [1, 2, 3, 4, 5]

output = []
for x in data:
    a = dask.delayed(inc)(x)
    b = dask.delayed(double)(x)
    c = dask.delayed(add)(a, b)    # depends on a and b
    output.append(c)

total = dask.delayed(sum)(output)  # depends on everything
total.compute()  # 45
 

Вы можете позвонить total.visualize() , чтобы просмотреть график задач

График задач из документов Dask
(изображение из документов Dask с задержкой)

Коллекции фьючерсов

Если вы уже используете .map(..) для сопоставления пар функций и аргументов, вы можете продолжать создавать фьючерсы, а затем .gather(..) их все сразу, даже если они находятся в коллекции (что удобно для вас здесь).

.gather() «Эд results » будет располагаться в том же порядке, в каком они были даны (список списков).

 [[fn1(args11), fn1(args12)], [fn2(args21)], [fn3(args31), fn3(args32), fn3(args33)]]
 

https://distributed.dask.org/en/latest/api.html#distributed.Клиент.соберите

 import dask
from dask.distributed import Client

client = Client(...)  # connect to distributed cluster

collection_of_futures = []

for worker_func, worker_args in iterable_of_pairs_of_fn_args:
    futures = client.map(worker_func, worker_args)
    collection_of_futures.append(futures)

results = client.gather(collection_of_futures)
 

Примечания

  • worker_args должно быть какое-то повторяющееся отображение worker_func , которое может быть источником ошибок
  • .gather() ing будет блокироваться до тех пор, пока все фьючерсы не будут завершены или не повысятся

.as_completed()

Если вам нужны результаты как можно быстрее, вы можете использовать .as_completed(..) , но обратите внимание, что результаты будут в недетерминированном порядке, поэтому я не думаю, что это имеет смысл для вашего случая .. если вы обнаружите, что это так, вам понадобятся некоторые дополнительные гарантии

  • включите в результат информацию о том, что делать с результатом
  • держите ссылку на каждый из них и проверяйте их
  • объединяйте группы только там, где это не имеет значения (т. е. все фьючерсы имеют одну и ту же цель)

также обратите внимание, что данные фьючерсы завершены, но все еще являются будущим, поэтому вам все равно нужно позвонить .result() или .gather() им

https://distributed.dask.org/en/latest/api.html#distributed.as_completed

Комментарии:

1. Я обращаюсь к работникам с распределенным dask: futures = client.map(worker_func, worker_args) worker_responses = client.gather(futures) как я могу включить в это задержку?

2. хм.. если вы так расположите свой ввод, он уже должен создавать коллекцию фьючерсов, а затем собирать результат — если вы не хотите сразу получать результат (т. Е. У вас много функций и пар worker_args), соберите результат каждого .map(..) в список, а затем .gather() их всех сразу — позвольте мне обновить мой ответ