Проблема с памятью Spark — «Недостаточно памяти для построения хэш-карты»

#java #scala #apache-spark #apache-spark-sql #heap-memory

Вопрос:

Я выполняю левое соединение между двумя таблицами — в левой таблице(допустим, А) 1,4 миллиарда записей с оценкой размера, дающей ей размер 17 ГБ, в правой таблице(Б) 100 миллионов записей — с приблизительным размером 1,3 ГБ.

Вывод вышеупомянутого объединенного фрейма данных содержит 120 миллионов записей (существуют фильтры, которые уменьшили номер записи после левого соединения). Выходной кадр данных используется дважды — поэтому я его кэшировал.

Теперь, когда я выполнил задание в том виде, в каком оно есть, на выполнение ушло 1 час 40 минут.

Поэтому, чтобы сократить время обработки, я пытался реализовать широковещательное соединение при первом левом соединении между A и B. Теперь возникает проблема.

Я использую следующие параметры конфигурации —

 "spark.ui.threadDumpsEnabled" = "true"
"spark.memory.storageFraction" = "0.5"
"spark.sql.shuffle.partitions" = "400"
"spark.driver.maxResultSize" = "8G"
"spark.sql.broadcastTimeout" = "36000"
"spark.driver.memory" = "30G"
"spark.executor.memory" = "15G"
"spark.executor.cores" = "4"
"spark.executor.instances" = "3"
"spark.kryoserializer.buffer.max" = "1500m"
"spark.driver.cores" = "4"
"spark.dynamicAllocation.enabled" = "false"
"spark.serializer" = "org.apache.spark.serializer.KryoSerializer"
"spark.executor.defaultJavaOptions=-XX: UseG1GC -Xss100M -XX: UnlockDiagnosticVMOptions -XX: G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent" = "45"
"spark.driver.defaultJavaOptions=-XX: UseG1GC -Xss100M -XX: UnlockDiagnosticVMOptions -XX: G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent" = "45"

 

Все эти параметры появлялись на картинке один за другим, когда я сталкивался с многочисленными проблемами с памятью.

"spark.sql.broadcastTimeout" = "36000" — это произошло потому, что я столкнулся с ошибкой ниже

 [org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.logError@91] - Could not execute broadcast in 300 secs.
java.util.concurrent.TimeoutException: Futures timed out after [300 seconds]
 

"spark.kryoserializer.buffer.max" = "1500m" — это произошло потому, что —

 org.apache.spark.SparkException: Kryo serialization failed: Buffer overflow. Available: 0, required: 4658915. To avoid this, increase spark.kryoserializer.buffer.max value
 

Я также столкнулся с этими ошибками —

 b) java.lang.OutOfMemoryError: GC overhead limit exceeded
c) java.lang.StackOverflowError
 

Но в настоящее время я сталкиваюсь с этой проблемой, которая для меня совершенно нова, и я не смог найти решение —

 Caused by: org.apache.spark.SparkException: There is no enough memory to build hash map
    at org.apache.spark.sql.execution.joins.UnsafeHashedRelation$.apply(HashedRelation.scala:314)
    at org.apache.spark.sql.execution.joins.HashedRelation$.apply(HashedRelation.scala:109)
    at org.apache.spark.sql.execution.joins.HashedRelationBroadcastMode.transform(HashedRelation.scala:857)
    at org.apache.spark.sql.execution.joins.HashedRelationBroadcastMode.transform(HashedRelation.scala:845)
    at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$anonfun$relationFuture$1$anonfun$apply$1.apply(BroadcastExchangeExec.scala:89)
    at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$anonfun$relationFuture$1$anonfun$apply$1.apply(BroadcastExchangeExec.scala:76)
    at org.apache.spark.sql.execution.SQLExecution$anonfun$withExecutionId$1.apply(SQLExecution.scala:141)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:165)
    at org.apache.spark.sql.execution.SQLExecution$.withExecutionId(SQLExecution.scala:138)
    at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$anonfun$relationFuture$1.apply(BroadcastExchangeExec.scala:75)
    at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$anonfun$relationFuture$1.apply(BroadcastExchangeExec.scala:75)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 

Thinking it is coming because I am not using any action until the last step of the execution, I tried to break the lineage by using intermediate action like count

But the issue still persists, I also tried to increase the driver memory to a great extent, but no luck.

EDITS

This is the first query I was talking about

 val a = spark.sql("""
select 
/*  BROADCAST(schema.table2) */
visit_date, 
v4.visid, 
visit_num,
device_family,
operating_system,
'test' AS traffic_grouping,
'test2'Junk_Filter,
'test3' visit_start_evar26,
visit_start_pagename,
visit_start_url,
referrer_domain,
session_duration,
isp_domain,
pages_visited,
qbo_sui_click,
qbo_sui_trial_click,
qbo_sui_buy_click,
qbse_sui_click,
qbse_sui_trial_click,
qbse_sui_buy_click,
qbse_customer_flag,
qbo_customer_flag,
case when s.ivid is not null then 1 else 0 end as segment_qbo
from schema.table2 v4
left join schema.table2 s 
    on v4.ivid = s.ivid and v4.visit_date > s.qbo_signup_date and source = 'check'
WHERE visit_date >= '2020-07-14'
and country in ('Australia','Australia')
and  ((ipd_flag = 0
    AND sbbg_flag = 0)
  or lower(visit_start_evar26) like ('%buy%'))
group by 1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23
""")
 

here I have done a broadcast join.

enter image description here

If anyone has any leads, it would really be a great help, thanks in advance!

enter image description here

enter image description here

enter image description here

введите описание изображения здесь

введите описание изображения здесь

Комментарии:

1. Покажите нам свой код и объясните, что вы делаете или хотите сделать с выводом?

2. Вы говорите о трансляции … трансляции каких данных?

3. У нас есть приложение, и данные связаны с людьми, посещающими наше приложение, такими как отметка времени, местоположение, устройство, с помощью которого они подключились, страницы, которые они посетили, подписались ли они на наш канал и т. Д. И т. Д. Эти данные являются конфиденциальными, иначе я бы поделился, в каждой таблице более 100 столбцов. Что касается части кода, то это очень простое и прямое левое соединение и пара фильтров, мне очень жаль, если этого недостаточно 🙂

4. Код всегда помогает… 😉 Вы пробовали не использовать кучу? какую v-образную искру вы используете?

5. Привет @jgp, добавил первый запрос — в нем была куча случаев(когда тогда), которые я удалил. И я использую spark 2.4