Как передать несколько аргументов в dask.distributed.Client().map?

#dask #dask-distributed

#dask #dask-распределенный

Вопрос:

 import dask.distributed
def f(x, y):
    return x, y
client = dask.distributed.Client()
client.map(f, [(1, 2), (2, 3)])
  

Не работает.

 [<Future: status: pending, key: f-137239e2f6eafbe900c0087f550bc0ca>,
 <Future: status: pending, key: f-64f918a0c730c63955da91694fcf7acc>]

distributed.worker - WARNING -  Compute Failed
Function:  f
args:      ((1, 2))
kwargs:    {}
Exception: TypeError("f() missing 1 required positional argument: 'y'",)

distributed.worker - WARNING -  Compute Failed
Function:  f
args:      ((2, 3))
kwargs:    {}
Exception: TypeError("f() missing 1 required positional argument: 'y'",)
  

Ответ №1:

У вас не совсем правильная подпись — возможно, документ неясен (предложения приветствуются). Client.map() принимает (переменное количество) наборов аргументов для каждой отправленной задачи, а не одну итеративную вещь. Вы должны сформулировать это как

 client.map(f, (1, 2), (2, 3))
  

или, если вы хотели оставаться ближе к своему исходному шаблону

 client.map(f, *[(1, 2), (2, 3)])
  

Комментарии:

1. И есть ли способ передавать kwargs как в списке dicts.

2. Нет, я так не думаю, вы можете передавать kwargs только для всех вызовов функций. Вам придется немного изменить свою целевую функцию и включить дополнительные параметры в аргументы, как указано выше.

3. @mdurant Я все еще не понимаю, ваш пример работает, но client.map(f, (1, 2), (2, 3), (3, 4)) нет.

4. Эти два действительно выглядят одинаково, но Dask особенно заботится о кортежах, поэтому где-то должна быть проверка типа.

5. @mdurant и client.map(f, (1, 2, 3), (2, 3, 4)) работает нормально.

Ответ №2:

Хорошо, в документации определенно немного запутано с этим. И я не смог найти пример, который четко продемонстрировал бы эту проблему. Итак, позвольте мне описать это ниже:

 def test_fn(a, b, c, d, **kwargs):
    return a   b   c   d   kwargs["special"]

futures = client.map(test_fn, *[[1, 2, 3, 4], (1, 2, 3, 4), (1, 2, 3, 4), (1, 2, 3, 4)], special=100)
output = [f.result() for f in futures]
# output = [104, 108, 112, 116]

futures = client.map(test_fn, [1, 2, 3, 4], (1, 2, 3, 4), (1, 2, 3, 4), (1, 2, 3, 4), special=100)
output = [f.result() for f in futures]
# output = [104, 108, 112, 116]
  

На что следует обратить внимание:

  1. Не имеет значения, используете ли вы списки или кортежи. И, как я сделал выше, вы можете смешивать их.
  2. Вы должны сгруппировать аргументы по их положению. Итак, если вы передаете 4 набора аргументов, первый список будет содержать первый аргумент из всех 4 наборов. (В этом случае «первый» вызов test_fn получает a=b=c=d=1.)
  3. Дополнительные **kwargs (подобные special ) передаются через функцию. Но это будет одно и то же значение для всех вызовов функций.

Теперь, когда я думаю об этом, это не так удивительно. Я думаю, что это просто следует за concurrent.futures Python.Подпись ProcessPoolExecutor.map().

PS. Обратите внимание, что, хотя в документации указано «Возвращает:
список, итератор или очередь фьючерсов, в зависимости от типа входных данных»., на самом деле вы можете получить эту ошибку: Dask no longer supports mapping over Iterators or Queues. Consider using a normal for loop and Client.submit