Оптимизация ресурсов Spark, чтобы избежать использования памяти и пространства

#apache-spark #pyspark #amazon-emr

#apache-spark #pyspark #amazon-emr

Вопрос:

У меня есть набор данных объемом около 190 ГБ, который был разделен на 1000 разделов.

мой кластер EMR поддерживает максимум 10 r5a.2xlarge узлов ЗАДАЧ и 2 ОСНОВНЫХ узла. Каждый узел имеет 64 ГБ памяти и 128 ГБ хранилища EBS.

При выполнении задания spark я настроил его на использование executor-cores 5 , driver cores 5 , executor-memory 40g , driver-memory 50g , spark.yarn.executor.memoryOverhead=10g , spark.sql.shuffle.partitions=500 , spark.dynamicAllocation.enabled=true

Но моя работа продолжает терпеть неудачу с такими ошибками, как

 spark.shuffle.MetadataFetchFailedException
spark.shuffle.FetchFailedException
java.io.IOException: No space left on device
Container Lost
etc...
 

Многие ответы на подобные вопросы, которые я нашел в Интернете, говорят об увеличении объема памяти. Что я и сделал, с 2G до 10G. Мой общий executor memory и memoryOverhead составляет 50G. 40 ГБ выделено исполнителю, а 10 ГБ — служебным ресурсам. Но я думаю, что я достигаю предела, так как я не смогу подняться выше 56.

Я думал, что сделал все возможное, чтобы оптимизировать свою работу spark:

  1. Увеличение разделов
  2. Увеличьте spark.sql.shuffle.partitions
  3. Увеличьте исполнительную и служебную память

Но моя работа по-прежнему терпит неудачу. Есть ли что-нибудь еще, что я могу попробовать? Должен ли я увеличить свои накладные расходы еще больше, чтобы моя исполнительная память / служебная память составляла 50/50? Профиль памяти моей работы из ganglia выглядит примерно так:

(Резкое падение происходит, когда кластер сбрасывает все узлы-исполнители из-за того, что они мертвы)

введите описание изображения здесь

Любая информация будет высоко оценена

Спасибо

РЕДАКТИРОВАТЬ: [РЕШЕНИЕ]

Я добавляю к своему сообщению точное решение, которое решило мою проблему Debuggerrr благодаря его предложениям в его ответе.

  • У меня был большой фрейм данных, который я повторно использовал после выполнения многих вычислений на других фреймах данных. Используя persist() метод (предложенный Debuggerrr), я смог сохранить это в ПАМЯТИ и на ДИСКЕ и просто вызвать его обратно, не очищая его части с помощью GC.
  • Я также следовал рекомендациям в блоге Debuggerrr, упомянутым в его ответе, и вычислил правильную память исполнителя, количество исполнителей и т. Д. Но то, что я не смог сделать, это отключить spark.dynamicAllocation.enabled . В блоге говорится, что лучше всего установить для свойства значение false, если мы вычисляем ресурсы вручную, поскольку spark имеет тенденцию неправильно распределять ресурсы, если ваши вычисления не совпадают с ним. Как только я установил для него значение false и установил правильные атрибуты executor и spark, это сработало как шарм!

[РЕДАКТИРОВАТЬ 2]: параметры, которые специально работали для моей работы:

 --executor-cores 5 --driver-cores 5 --executor-memory 44g --driver-memory 44g --num-executors 9 --conf spark.default.parallelism=100 --conf spark.sql.shuffle.partitions=300 --conf spark.yarn.executor.memoryOverhead=11g --conf spark.shuffle.io.retryWait=180s --conf spark.network.timeout=800s --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.dynamicAllocation.enabled=false
 

Ответ №1:

Вы можете попробовать любой из следующих шагов:

  1. Memory overhead должно быть 10% из памяти исполнителя или 328 MB . Не увеличивайте его до какого-либо значения.
  2. Удалите ядра драйверов.
  3. Если у вас 10 узлов, укажите number of executors . Вы должны рассчитать его таким образом, чтобы оставить немного места для YARN и фоновых процессов. Кроме того, вы можете попробовать увеличить еще 1 или 2 ядра.
  4. Запустите его в cluster режиме, и какой бы номер вы ни назначили исполнителям, добавьте к нему 1, поскольку 1 исполнитель будет рассматриваться как исполнитель драйвера в режиме кластера.
  5. Кроме того, последнее — это не что иное, как ваш код, написанный для отправки / обработки этого файла объемом 190 ГБ. Просмотрите свой код и найдите способы его оптимизации. Ищите методы сбора или ненужное использование объединений, объединение / перераспределение. Найдите какие-нибудь альтернативы, если это не нужно.
  6. Используйте параметр persist (только для памяти и диска) для фреймов данных, которые вы часто используете в коде.
  7. Также последнее, что я пробовал, это выполнить шаги вручную spark-shell on EMR , и вы узнаете, какая часть кода занимает много времени для запуска.

Вы также можете обратиться к этому официальному блогу за некоторыми советами.

Комментарии:

1. Спасибо за эти идеи!. Я использую spark.dynamicAllocation.enabled=true , поэтому я не указываю количество исполнителей, поскольку об этом позаботится пряжа. Я видел, что количество исполнителей увеличилось до 11. Я запускаю его только в режиме кластера, и у меня нет никакого метода сбора в моем коде. Итак, я предполагаю, что я уже выполняю пункты 3,4 и 5, как вы упомянули. Я часто использую фрейм данных, я могу попробовать опцию сохранения.

2. (Продолжение комментария выше) Для пункта № 7 я протестировал свой код на очень небольшом подмножестве в jupiterlab notebook, и он работает нормально. Однако я думаю, что мой набор данных сильно искажен. Есть ли способ проверить асимметрию?

3. Суть упоминания num-executors параметра заключается в том, что даже если вы оставите Spark, чтобы позаботиться об исполнителях, вы можете заметить, что он достигает максимум 11 исполнителей, даже если вы назначили 40GB ram на исполнителя и у вас есть 10 узлов. с вами. Дело в том, что память используется не полностью, а также влияет на параллелизм. Чем больше исполнителей, тем больше параллелизма. Предположим, что 40 ГБ оперативной памяти назначено 1 исполнителю, максимум 5 ГБ (которые обычно не заняты) для накладных расходов YARN и других фоновых процессов. Тем не менее, у вас осталось 19 ГБ оперативной памяти на узел.

4. Вы можете попробовать с 15, если вас не устраивает 20. Дело в том, что если у вас есть 9 исполнителей с 10 узлами и 40 ГБ оперативной памяти, предполагая, что 1 исполнитель будет на 1 узле, тогда у вас все еще есть 1 узел, который простаивает (память используется недостаточно). Если вы назначите 15, то у каждого узла будет по крайней мере 1 исполнитель, а также увеличится параллелизм, что также приведет к более быстрой обработке. Я рад узнать, что это сработало для вас 😊

5. Отлично! И да, как я уже сказал в своем ответе, в режиме кластера 1 исполнитель рассматривается как поток драйвера, поэтому я попросил вас указать 1 количество исполнителей.