#dask #dask-distributed
#dask #dask-распределенный
Вопрос:
import dask.distributed
def f(x, y):
return x, y
client = dask.distributed.Client()
client.map(f, [(1, 2), (2, 3)])
Не работает.
[<Future: status: pending, key: f-137239e2f6eafbe900c0087f550bc0ca>,
<Future: status: pending, key: f-64f918a0c730c63955da91694fcf7acc>]
distributed.worker - WARNING - Compute Failed
Function: f
args: ((1, 2))
kwargs: {}
Exception: TypeError("f() missing 1 required positional argument: 'y'",)
distributed.worker - WARNING - Compute Failed
Function: f
args: ((2, 3))
kwargs: {}
Exception: TypeError("f() missing 1 required positional argument: 'y'",)
Ответ №1:
У вас не совсем правильная подпись — возможно, документ неясен (предложения приветствуются). Client.map()
принимает (переменное количество) наборов аргументов для каждой отправленной задачи, а не одну итеративную вещь. Вы должны сформулировать это как
client.map(f, (1, 2), (2, 3))
или, если вы хотели оставаться ближе к своему исходному шаблону
client.map(f, *[(1, 2), (2, 3)])
Комментарии:
1. И есть ли способ передавать kwargs как в списке dicts.
2. Нет, я так не думаю, вы можете передавать kwargs только для всех вызовов функций. Вам придется немного изменить свою целевую функцию и включить дополнительные параметры в аргументы, как указано выше.
3. @mdurant Я все еще не понимаю, ваш пример работает, но
client.map(f, (1, 2), (2, 3), (3, 4))
нет.4. Эти два действительно выглядят одинаково, но Dask особенно заботится о кортежах, поэтому где-то должна быть проверка типа.
5. @mdurant и
client.map(f, (1, 2, 3), (2, 3, 4))
работает нормально.
Ответ №2:
Хорошо, в документации определенно немного запутано с этим. И я не смог найти пример, который четко продемонстрировал бы эту проблему. Итак, позвольте мне описать это ниже:
def test_fn(a, b, c, d, **kwargs):
return a b c d kwargs["special"]
futures = client.map(test_fn, *[[1, 2, 3, 4], (1, 2, 3, 4), (1, 2, 3, 4), (1, 2, 3, 4)], special=100)
output = [f.result() for f in futures]
# output = [104, 108, 112, 116]
futures = client.map(test_fn, [1, 2, 3, 4], (1, 2, 3, 4), (1, 2, 3, 4), (1, 2, 3, 4), special=100)
output = [f.result() for f in futures]
# output = [104, 108, 112, 116]
На что следует обратить внимание:
- Не имеет значения, используете ли вы списки или кортежи. И, как я сделал выше, вы можете смешивать их.
- Вы должны сгруппировать аргументы по их положению. Итак, если вы передаете 4 набора аргументов, первый список будет содержать первый аргумент из всех 4 наборов. (В этом случае «первый» вызов
test_fn
получает a=b=c=d=1.) - Дополнительные
**kwargs
(подобныеspecial
) передаются через функцию. Но это будет одно и то же значение для всех вызовов функций.
Теперь, когда я думаю об этом, это не так удивительно. Я думаю, что это просто следует за concurrent.futures Python.Подпись ProcessPoolExecutor.map().
PS. Обратите внимание, что, хотя в документации указано «Возвращает:
список, итератор или очередь фьючерсов, в зависимости от типа входных данных»., на самом деле вы можете получить эту ошибку: Dask no longer supports mapping over Iterators or Queues. Consider using a normal for loop and Client.submit