Dask использует большую память при вычислении двух значений с общей зависимостью

#python #dask #dask-distributed #dask-delayed

#python #dask #dask-распределенный #dask-задержка

Вопрос:

Я использую Dask на одной машине ( LocalCluster с 4 процессами, 16 потоками, 68,56 ГБ памяти) и сталкиваюсь с проблемами рабочей памяти при попытке вычислить сразу два результата, которые разделяют зависимость.

В примере, показанном ниже, вычисления result только с одним вычислением выполняются нормально и быстро, при этом совокупное использование рабочей памяти составляет около 1 ГБ. Однако при вычислении results с помощью двух вычислений рабочие устройства быстро используют всю свою память и начинают запись на диск, когда общее использование памяти составляет около 40 ГБ. Вычисление в конечном итоге завершится, но, как и следовало ожидать, произойдет значительное замедление, как только начнется запись на диск.

Интуитивно понятно, что если считывается один фрагмент, а затем сразу вычисляются две его суммы, то этот фрагмент можно отбросить, а использование памяти останется низким. Однако, похоже, что Dask отдает приоритет загрузке данных вместо более поздних агрегированных вычислений, которые очищают память.

Любая помощь в понимании того, что здесь происходит, была бы весьма признательна. Как я могу вычислить два результата с общей зависимостью без необходимости дважды считывать базовые данные или полностью считывать их в память?

 import dask
import dask.dataframe as dd
import dask.array as da
from dask.distributed import Client

client = Client("localhost:8786")

array = da.random.normal(size=(int(1e9), 10), chunks=(int(1e6), 10))
df = dd.from_array(array, columns=[str(i) for i in range(10)])

# does not blow up worker memory, overall usage stays below 1GB total
result = dask.compute(df["0"].sum())

# does blow up worker memory
results = dask.compute([df["0"].sum(), df["1"].sum()])
 

Ответ №1:

Способ построения массива заключается в том, что каждый раз, когда создается фрагмент, он должен генерировать каждый столбец массива. Итак, одна из возможностей для оптимизации (если это возможно) — сгенерировать / загрузить массив таким образом, чтобы обеспечить обработку по столбцам. Это уменьшит нагрузку на память одной задачи.

Другим способом оптимизации является явное указание общих зависимостей, например dask.compute(df[['0', '1']].sum()) , будет работать эффективно.

Однако более важным моментом является то, что по умолчанию dask соблюдаются некоторые эмпирические правила о том, как расставлять приоритеты в работе, см. Здесь . У вас есть несколько вариантов вмешательства (не уверен, является ли этот список исчерпывающим): пользовательские приоритеты, ограничения ресурсов, изменение графика вычислений (чтобы позволить работникам освобождать память от промежуточных задач, не дожидаясь завершения окончательной задачи).

Простой способ изменить график — это разрушить зависимость между конечной суммой и всеми промежуточными задачами путем вычисления промежуточных сумм вручную:

 [results] = dask.compute([df["0"].map_partitions(sum), df["1"].map_partitions(sum)])
 

Обратите внимание, что это results будет список из двух подсписков, но вычислить сумму каждого подсписка тривиально (попытка запуска sum с отложенным объектом вызовет вычисление, поэтому более эффективно запускать sum после results вычисления).

Комментарии:

1. Большое вам спасибо за помощь! Ваши предложения привели меня к решению моей проблемы. Мои фактические вычисления были немного сложнее, чем суммирование, и я применял функцию к каждому элементу списка отложенных pd.DataFrame объектов, используя встроенную map функцию. Как и в случае с sum методом в моем примере выше, похоже map , что функция препятствовала эффективному разделению задачи. После переключения с map функции на цикл for или эквивалентное понимание списка система смогла правильно разделить задачу и обрабатывать вычисления без увеличения объема памяти.

2. Это здорово! С его помощью .visualize() можно было бы увидеть, есть ли дополнительные преимущества для оптимизации рабочего процесса.