Dask как распределить данные при выполнении сокращения

#python #dask #dask-distributed #dask-dataframe

#python #dask #dask-распределенный #dask-dataframe

Вопрос:

Я использую Dask для сложной операции. Сначала я выполняю сокращение, которое создает df среднего размера (несколько МБ), который затем мне нужно передать каждому работнику для вычисления конечного результата, поэтому мой код выглядит примерно так

 intermediate_result = ddf.reduction().compute()

final_result = ddf.reduction(
    chunk=function, chunk_kwargs={"intermediate_result": intermediate_result}
)
 

Однако я получаю предупреждающее сообщение, которое выглядит следующим образом

 Consider scattering large objects ahead of time
with client.scatter to reduce scheduler burden and
keep data on workers

    future = client.submit(func, big_data)    # bad

    big_future = client.scatter(big_data)     # good
    future = client.submit(func, big_future)  # good
  % (format_bytes(len(b)), s)
 

Я пытался сделать это

 intermediate_result = client.scatter(intermediate_result, broadcast=True)
 

Но это не работает, поскольку функция теперь видит это как будущий объект, а не тип данных, которым он должен быть.
Кажется, я не могу найти никакой документации о том, как использовать разброс с сокращениями, кто-нибудь знает, как это сделать? Или я должен просто игнорировать предупреждающее сообщение и передавать df среднего размера таким, какой я есть?

Ответ №1:

На самом деле, лучшим решением, вероятно, является не разбрасывать ваш материализованный результат, а избегать его вычисления в первую очередь. Вы можете просто удалить .compute() , что будет означать, что все вычисления выполняются за один этап, а результаты автоматически перемещаются туда, где они вам нужны.

В качестве альтернативы, если вы хотите иметь четкую границу между этапами, вы можете использовать

 intermediate_result = ddf.reduction().persist()
 

который запустит сокращение и сохранит его на рабочих, не передавая его клиенту. Вы можете подождать завершения этого до следующего шага или нет.

Комментарии:

1. Спасибо @mdurant однако, несомненно, это приведет к тому, что моя функция function будет работать с фреймом данных dask и фреймом данных pandas (каждый раздел в сокращении). Мне нужно, чтобы они оба были фреймами данных при вызове функции

2. Не думайте так, но попробуйте. Вы всегда можете compute() использовать агрегацию, если вам нужно.