Искровая оптимизация — объединения — очень низкое количество задач — ООМ

#postgresql #amazon-web-services #scala #apache-spark #out-of-memory

#postgresql #amazon-веб-сервисы #scala #apache-spark #нехватка памяти

Вопрос:

Сбой моего приложения Spark с этой ошибкой : Exit status: 143. Diagnostics: Container killed on request. Exit code is 143
Это то, что я получаю, когда проверяю журнал контейнера : java.lang.OutOfMemoryError: Java heap space

Мое приложение в основном получает таблицу, а затем объединяет разные таблицы, которые я прочитал из aws S3:

 var result = readParquet(table1)  
val table2 = readParquet(table2)

result = result.join(table2 , result(primaryKey) === table2(foreignKey))

val table3 = readParquet(table3)

result = result.join(table3 , result(primaryKey) === table3(foreignKey))

val table4 = readParquet(table4)

result = result.join(table4 , result(primaryKey) === table4(foreignKey))
  

и так далее

Сбой моего приложения при попытке сохранить мой результирующий фрейм данных в postgresql с помощью :

 result.toDF(df.columns.map(x => x.toLowerCase()): _*).write
  .mode("overwrite")
  .format("jdbc")
  .option(JDBCOptions.JDBC_TABLE_NAME, table)
  .save()
  

На этапе неудачного объединения у меня очень низкое количество задач: 6 задач для 4 исполнителей

введите описание изображения здесь

Почему мой этап Stage генерирует 2 задания?

Первая задача завершена с 426 задачами :

введите описание изображения здесь

и второй сбой :

введите описание изображения здесь

Мой spark-отправить conf :

 dynamicAllocation = true  
num core = 2
driver memory = 6g
executor memory = 6g
max num executor = 10
min num executor = 1
spark.default.parallelism = 400
spark.sql.shuffle.partitions = 400
  

Я пробовал использовать больше ресурсов, но та же проблема :

  num core = 5
 driver memory = 16g
 executor memory = 16g
 num executor = 20
  

Я думаю, что все данные отправляются в один и тот же раздел / исполнителя даже с номером раздела по умолчанию 400, и это вызывает ошибку ООМ

Я пытался (безуспешно): persit data
broadcastJoin, но моя таблица недостаточно мала, чтобы транслировать ее в конце.
перераспределение на большее число (4000) и подсчет между каждым объединением для выполнения действия :

моя основная таблица растет очень быстро:
(количество строк ) 40 -> 68 -> 7304 -> 946 832 -> 123 032 864 -> 246 064 864 -> ( слишком много времени спустя )
Однако размер данных очень мал

Если я посмотрю на показатели задач, интересная вещь заключается в том, что мои данные перекошены (я действительно не уверен)
В последнем действии подсчета, я вижу, что ~ 120 задач выполняют действие с ~ 10 МБ входных данных для 100 записей и 12 секунд, а остальные 3880 задач абсолютно ничего не делают (3 мс, 0 записей 16B (метаданные ? )):

введите описание изображения здесь

Комментарии:

1. объем памяти драйвера = 16 гб — это слишком большая нагрузка на память, и ее не нужно использовать, когда у вас есть огромный набор данных для обработки с помощью таких действий, как (collect())

2. выполните перераспределение при чтении файлов readParquet(table1).перераспределение(x) … если одна из таблиц маленькая, вы можете транслировать это и удалить join use mapPartition и использовать широковещательную переменную в качестве кэша поиска.

3. пожалуйста, укажите размер вашей таблицы, о котором идет речь, для получения лучших предложений

4. Как это связано с PostgreSQL?

5. @kavetiraviteja Спасибо за информацию, я сделаю замену ключей, чтобы решить свою проблему. Вы можете оставить свой комментарий в качестве ответа, и я приму его.

Ответ №1:

объем памяти драйвера = 16 гб — это слишком большой объем памяти, и он не нужен. используйте только тогда, когда у вас есть огромный набор данных для обработки с помощью таких действий, как (collect() ) обязательно увеличьте spark.maxResult.size, если это так

вы можете выполнить следующие действия

— Выполните перераспределение при чтении файлов readParquet(table1).перераспределение (x). если одна из таблиц маленькая, вы можете транслировать это и удалить join, вместо этого используйте mapPartition и используйте широковещательную переменную в качестве кэша поиска.

(ИЛИ)

Выберите столбец, который равномерно распределен, и соответствующим образом перераспределите вашу таблицу, используя этот конкретный столбец.

Два момента, на которые мне нужно обратить внимание, посмотрев на приведенную выше статистику. ваша работа имеет высокую задержку планирования, которая вызвана слишком большим количеством задач и статистикой вашей задачи, немногие статистические данные запускаются с входными данными в 10 байт и немногие запускаются с 9 МБ …. очевидно, что здесь имеет место асимметрия данных… как вы сказали, первая завершена с 426 задачами, но с 4000 в качестве количества перераспределений она должна запускать больше задач

пожалуйста, посмотрите на https://towardsdatascience.com/the-art-of-joining-in-spark-dcbd33d693c … для получения дополнительной информации.