#python #dask #dask-distributed #dask-delayed
#python #dask #dask-распределенный #dask-задержка
Вопрос:
Я использую Dask на одной машине ( LocalCluster
с 4 процессами, 16 потоками, 68,56 ГБ памяти) и сталкиваюсь с проблемами рабочей памяти при попытке вычислить сразу два результата, которые разделяют зависимость.
В примере, показанном ниже, вычисления result
только с одним вычислением выполняются нормально и быстро, при этом совокупное использование рабочей памяти составляет около 1 ГБ. Однако при вычислении results
с помощью двух вычислений рабочие устройства быстро используют всю свою память и начинают запись на диск, когда общее использование памяти составляет около 40 ГБ. Вычисление в конечном итоге завершится, но, как и следовало ожидать, произойдет значительное замедление, как только начнется запись на диск.
Интуитивно понятно, что если считывается один фрагмент, а затем сразу вычисляются две его суммы, то этот фрагмент можно отбросить, а использование памяти останется низким. Однако, похоже, что Dask отдает приоритет загрузке данных вместо более поздних агрегированных вычислений, которые очищают память.
Любая помощь в понимании того, что здесь происходит, была бы весьма признательна. Как я могу вычислить два результата с общей зависимостью без необходимости дважды считывать базовые данные или полностью считывать их в память?
import dask
import dask.dataframe as dd
import dask.array as da
from dask.distributed import Client
client = Client("localhost:8786")
array = da.random.normal(size=(int(1e9), 10), chunks=(int(1e6), 10))
df = dd.from_array(array, columns=[str(i) for i in range(10)])
# does not blow up worker memory, overall usage stays below 1GB total
result = dask.compute(df["0"].sum())
# does blow up worker memory
results = dask.compute([df["0"].sum(), df["1"].sum()])
Ответ №1:
Способ построения массива заключается в том, что каждый раз, когда создается фрагмент, он должен генерировать каждый столбец массива. Итак, одна из возможностей для оптимизации (если это возможно) — сгенерировать / загрузить массив таким образом, чтобы обеспечить обработку по столбцам. Это уменьшит нагрузку на память одной задачи.
Другим способом оптимизации является явное указание общих зависимостей, например dask.compute(df[['0', '1']].sum())
, будет работать эффективно.
Однако более важным моментом является то, что по умолчанию dask
соблюдаются некоторые эмпирические правила о том, как расставлять приоритеты в работе, см. Здесь . У вас есть несколько вариантов вмешательства (не уверен, является ли этот список исчерпывающим): пользовательские приоритеты, ограничения ресурсов, изменение графика вычислений (чтобы позволить работникам освобождать память от промежуточных задач, не дожидаясь завершения окончательной задачи).
Простой способ изменить график — это разрушить зависимость между конечной суммой и всеми промежуточными задачами путем вычисления промежуточных сумм вручную:
[results] = dask.compute([df["0"].map_partitions(sum), df["1"].map_partitions(sum)])
Обратите внимание, что это results
будет список из двух подсписков, но вычислить сумму каждого подсписка тривиально (попытка запуска sum
с отложенным объектом вызовет вычисление, поэтому более эффективно запускать sum
после results
вычисления).
Комментарии:
1. Большое вам спасибо за помощь! Ваши предложения привели меня к решению моей проблемы. Мои фактические вычисления были немного сложнее, чем суммирование, и я применял функцию к каждому элементу списка отложенных
pd.DataFrame
объектов, используя встроеннуюmap
функцию. Как и в случае сsum
методом в моем примере выше, похожеmap
, что функция препятствовала эффективному разделению задачи. После переключения сmap
функции на цикл for или эквивалентное понимание списка система смогла правильно разделить задачу и обрабатывать вычисления без увеличения объема памяти.2. Это здорово! С его помощью
.visualize()
можно было бы увидеть, есть ли дополнительные преимущества для оптимизации рабочего процесса.