#python #dask
Вопрос:
Запуск кода в кластере Dask с map_partitions(...).compute()
ожиданием результатов только cca размером 2 ГБ.
Удивительно, но это не удается MemoryError
на моей локальной машине! Хотя я, безусловно, использую клиент для удаленного кластера, панель мониторинга удаленного кластера также показывает, что кластер занят этой задачей, и кластер также достигает своего пика потребления памяти (изначально я думал, что проблема заключается в памяти кластера).
Интересно, как вообще возможно, что проблема с памятью возникает в памяти локальной машины? Ожидаемый размер результата составляет около 2 ГБ, в то время как у моего ноутбука 16 ГБ.
dask_client.upload_file(os.path.join(src_folder,'capacity.py')) result = (read_parquets_separately_by_dask_and_concatenate('hub_to_hub_capacity/2021/10/') .map_partitions(capacity.capacity_features, meta=meta_capacity_features, transform_divisions=False) ).compute()
В кластере Dask (развернутом в Azure) У меня есть досье на каждый час в течение месяца. Dask читает каждый файл независимо и объединяет их. И я ожидаю, что » map_partitions` будет обрабатывать каждый фрагмент данных файла. Тот же код за один день (24 файла) быстро запустился и в результате вернул 70 МБ pandas df.
Сообщение об ошибке показывает мои локальные пути к библиотекам. И я испытываю сбои в работе других приложений из-за проблем с памятью. Таким образом, несомненно, что проблема возникает локально:
distributed.protocol.core - CRITICAL - Failed to Serialize Traceback (most recent call last): ... frames[0] = msgpack.dumps(msg, default=_encode_default, use_bin_type=True) File "C:Usersoleg.demidenko.virtualenvsinterc_predict-5QmpLbYYlibsite-packagesmsgpack__init__.py", line 35, in packb return Packer(**kwargs).pack(o) File "msgpack_packer.pyx", line 120, in msgpack._cmsgpack.Packer.__cinit__ MemoryError: Unable to allocate internal buffer. distributed.batched - ERROR - Error in batched write Traceback (most recent call last): ... File "C:Usersoleg.demidenko.virtualenvsinterc_predict-5QmpLbYYlibsite-packagesmsgpack__init__.py", line 35, in packb return Packer(**kwargs).pack(o) File "msgpack_packer.pyx", line 120, in msgpack._cmsgpack.Packer.__cinit__ MemoryError: Unable to allocate internal buffer.
Ответ №1:
Для того чтобы Dask построил фрейм данных в вашем клиенте в результате a compute()
, ему необходимо
- загрузите набор результатов от работников, закодированных в виде байт-потоков
- декодирование в фреймы данных pandas в памяти
- вызовите
pd.concatenate
, чтобы объединить это в один вывод.
При объединении все составляющие фреймы данных должны находиться в памяти клиентов, и входящие байт-потоки, возможно, еще не были освобождены, следовательно, всплеск памяти. Была проделана работа, чтобы попытаться сделать шаги с нулевым копированием, но я не знаю, как здесь продвигается. С общедоступным API pandas сложно назначить ожидаемый фрейм данных и записать данные непосредственно в него (см. fastparquet делает это в сложном и подверженном ошибкам коде!).
На самом деле, a .compute()
предназначен для значительно обобщенного конечного результата. Во всех рекомендациях по ограничению памяти работа с кадрами данных pandas (с dask или без него) предполагает, что у вас всегда должно быть «от нескольких до многих раз» (в зависимости от процесса) объем свободных данных в памяти, чтобы они могли работать бесперебойно.
-РЕДАКТИРОВАТЬ-
Обратная связь фактически предполагает, что проблема возникает у работника во время упаковки фрейма данных для отправки клиенту; но применяются те же аргументы.
Комментарии:
1. Я не получил последнюю правку и почему «применяются те же аргументы». Итак, работники или планировщик выполняют часть объединения результатов, и только какая-то часть остается от клиента? Можно ли устранить проблему, с которой я сталкиваюсь, обновив кластер или каким-то образом изменив код? Единственное, что я не могу изменить, — это мой локальный клиентский ноутбук. И это кажется странной проблемой, если ноутбук на 16 ГБ не может получить результаты на 2 ГБ.
2. Может быть, есть какой-то документ о том, что вы описали в своем ответе? Я попытался найти что-нибудь, но потерпел неудачу.
3. Пожалуйста, дайте больше информации о
read_parquets_separately_by_dask_and_concatenate
том, как и где он работает.4. Можете ли вы сказать, происходит ли исключение у клиента или у работника?
5. Функция выполняется в кластере, и она примерно такая:
dask.dataframe.concatenate([dask.read_parquet() for path in paths])
. Как я уже писал в вопросе: эти исключения показывают пути моего локального клиента (ноутбук). И другие приложения на моем ноутбуке выходят из строя из-за нехватки памяти.