#python #dask #dask-distributed #dask-dataframe
#python #dask #dask-распределенный #dask-dataframe
Вопрос:
Я использую Dask для сложной операции. Сначала я выполняю сокращение, которое создает df среднего размера (несколько МБ), который затем мне нужно передать каждому работнику для вычисления конечного результата, поэтому мой код выглядит примерно так
intermediate_result = ddf.reduction().compute()
final_result = ddf.reduction(
chunk=function, chunk_kwargs={"intermediate_result": intermediate_result}
)
Однако я получаю предупреждающее сообщение, которое выглядит следующим образом
Consider scattering large objects ahead of time
with client.scatter to reduce scheduler burden and
keep data on workers
future = client.submit(func, big_data) # bad
big_future = client.scatter(big_data) # good
future = client.submit(func, big_future) # good
% (format_bytes(len(b)), s)
Я пытался сделать это
intermediate_result = client.scatter(intermediate_result, broadcast=True)
Но это не работает, поскольку функция теперь видит это как будущий объект, а не тип данных, которым он должен быть.
Кажется, я не могу найти никакой документации о том, как использовать разброс с сокращениями, кто-нибудь знает, как это сделать? Или я должен просто игнорировать предупреждающее сообщение и передавать df среднего размера таким, какой я есть?
Ответ №1:
На самом деле, лучшим решением, вероятно, является не разбрасывать ваш материализованный результат, а избегать его вычисления в первую очередь. Вы можете просто удалить .compute()
, что будет означать, что все вычисления выполняются за один этап, а результаты автоматически перемещаются туда, где они вам нужны.
В качестве альтернативы, если вы хотите иметь четкую границу между этапами, вы можете использовать
intermediate_result = ddf.reduction().persist()
который запустит сокращение и сохранит его на рабочих, не передавая его клиенту. Вы можете подождать завершения этого до следующего шага или нет.
Комментарии:
1. Спасибо @mdurant однако, несомненно, это приведет к тому, что моя функция
function
будет работать с фреймом данных dask и фреймом данных pandas (каждый раздел в сокращении). Мне нужно, чтобы они оба были фреймами данных при вызове функции2. Не думайте так, но попробуйте. Вы всегда можете
compute()
использовать агрегацию, если вам нужно.