dask.map_partitions(..).compute() с ожидаемым размером результата 2 ГБ вызывает ошибку памяти на локальном компьютере 16 ГБ

#python #dask

Вопрос:

Запуск кода в кластере Dask с map_partitions(...).compute() ожиданием результатов только cca размером 2 ГБ.

Удивительно, но это не удается MemoryError на моей локальной машине! Хотя я, безусловно, использую клиент для удаленного кластера, панель мониторинга удаленного кластера также показывает, что кластер занят этой задачей, и кластер также достигает своего пика потребления памяти (изначально я думал, что проблема заключается в памяти кластера).

Интересно, как вообще возможно, что проблема с памятью возникает в памяти локальной машины? Ожидаемый размер результата составляет около 2 ГБ, в то время как у моего ноутбука 16 ГБ.

 dask_client.upload_file(os.path.join(src_folder,'capacity.py')) result = (read_parquets_separately_by_dask_and_concatenate('hub_to_hub_capacity/2021/10/')  .map_partitions(capacity.capacity_features,  meta=meta_capacity_features,  transform_divisions=False) ).compute()  

В кластере Dask (развернутом в Azure) У меня есть досье на каждый час в течение месяца. Dask читает каждый файл независимо и объединяет их. И я ожидаю, что » map_partitions` будет обрабатывать каждый фрагмент данных файла. Тот же код за один день (24 файла) быстро запустился и в результате вернул 70 МБ pandas df.

Сообщение об ошибке показывает мои локальные пути к библиотекам. И я испытываю сбои в работе других приложений из-за проблем с памятью. Таким образом, несомненно, что проблема возникает локально:

 distributed.protocol.core - CRITICAL - Failed to Serialize Traceback (most recent call last): ...  frames[0] = msgpack.dumps(msg, default=_encode_default, use_bin_type=True)  File "C:Usersoleg.demidenko.virtualenvsinterc_predict-5QmpLbYYlibsite-packagesmsgpack__init__.py", line 35, in packb  return Packer(**kwargs).pack(o)  File "msgpack_packer.pyx", line 120, in msgpack._cmsgpack.Packer.__cinit__ MemoryError: Unable to allocate internal buffer. distributed.batched - ERROR - Error in batched write Traceback (most recent call last):  ...  File "C:Usersoleg.demidenko.virtualenvsinterc_predict-5QmpLbYYlibsite-packagesmsgpack__init__.py", line 35, in packb  return Packer(**kwargs).pack(o)  File "msgpack_packer.pyx", line 120, in msgpack._cmsgpack.Packer.__cinit__ MemoryError: Unable to allocate internal buffer.  

Ответ №1:

Для того чтобы Dask построил фрейм данных в вашем клиенте в результате a compute() , ему необходимо

  • загрузите набор результатов от работников, закодированных в виде байт-потоков
  • декодирование в фреймы данных pandas в памяти
  • вызовите pd.concatenate , чтобы объединить это в один вывод.

При объединении все составляющие фреймы данных должны находиться в памяти клиентов, и входящие байт-потоки, возможно, еще не были освобождены, следовательно, всплеск памяти. Была проделана работа, чтобы попытаться сделать шаги с нулевым копированием, но я не знаю, как здесь продвигается. С общедоступным API pandas сложно назначить ожидаемый фрейм данных и записать данные непосредственно в него (см. fastparquet делает это в сложном и подверженном ошибкам коде!).

На самом деле, a .compute() предназначен для значительно обобщенного конечного результата. Во всех рекомендациях по ограничению памяти работа с кадрами данных pandas (с dask или без него) предполагает, что у вас всегда должно быть «от нескольких до многих раз» (в зависимости от процесса) объем свободных данных в памяти, чтобы они могли работать бесперебойно.

-РЕДАКТИРОВАТЬ-

Обратная связь фактически предполагает, что проблема возникает у работника во время упаковки фрейма данных для отправки клиенту; но применяются те же аргументы.

Комментарии:

1. Я не получил последнюю правку и почему «применяются те же аргументы». Итак, работники или планировщик выполняют часть объединения результатов, и только какая-то часть остается от клиента? Можно ли устранить проблему, с которой я сталкиваюсь, обновив кластер или каким-то образом изменив код? Единственное, что я не могу изменить, — это мой локальный клиентский ноутбук. И это кажется странной проблемой, если ноутбук на 16 ГБ не может получить результаты на 2 ГБ.

2. Может быть, есть какой-то документ о том, что вы описали в своем ответе? Я попытался найти что-нибудь, но потерпел неудачу.

3. Пожалуйста, дайте больше информации о read_parquets_separately_by_dask_and_concatenate том, как и где он работает.

4. Можете ли вы сказать, происходит ли исключение у клиента или у работника?

5. Функция выполняется в кластере, и она примерно такая: dask.dataframe.concatenate([dask.read_parquet() for path in paths]) . Как я уже писал в вопросе: эти исключения показывают пути моего локального клиента (ноутбук). И другие приложения на моем ноутбуке выходят из строя из-за нехватки памяти.