#java #scala #apache-spark #apache-spark-sql #heap-memory
Вопрос:
Я выполняю левое соединение между двумя таблицами — в левой таблице(допустим, А) 1,4 миллиарда записей с оценкой размера, дающей ей размер 17 ГБ, в правой таблице(Б) 100 миллионов записей — с приблизительным размером 1,3 ГБ.
Вывод вышеупомянутого объединенного фрейма данных содержит 120 миллионов записей (существуют фильтры, которые уменьшили номер записи после левого соединения). Выходной кадр данных используется дважды — поэтому я его кэшировал.
Теперь, когда я выполнил задание в том виде, в каком оно есть, на выполнение ушло 1 час 40 минут.
Поэтому, чтобы сократить время обработки, я пытался реализовать широковещательное соединение при первом левом соединении между A и B. Теперь возникает проблема.
Я использую следующие параметры конфигурации —
"spark.ui.threadDumpsEnabled" = "true"
"spark.memory.storageFraction" = "0.5"
"spark.sql.shuffle.partitions" = "400"
"spark.driver.maxResultSize" = "8G"
"spark.sql.broadcastTimeout" = "36000"
"spark.driver.memory" = "30G"
"spark.executor.memory" = "15G"
"spark.executor.cores" = "4"
"spark.executor.instances" = "3"
"spark.kryoserializer.buffer.max" = "1500m"
"spark.driver.cores" = "4"
"spark.dynamicAllocation.enabled" = "false"
"spark.serializer" = "org.apache.spark.serializer.KryoSerializer"
"spark.executor.defaultJavaOptions=-XX: UseG1GC -Xss100M -XX: UnlockDiagnosticVMOptions -XX: G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent" = "45"
"spark.driver.defaultJavaOptions=-XX: UseG1GC -Xss100M -XX: UnlockDiagnosticVMOptions -XX: G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent" = "45"
Все эти параметры появлялись на картинке один за другим, когда я сталкивался с многочисленными проблемами с памятью.
"spark.sql.broadcastTimeout" = "36000"
— это произошло потому, что я столкнулся с ошибкой ниже
[org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.logError@91] - Could not execute broadcast in 300 secs.
java.util.concurrent.TimeoutException: Futures timed out after [300 seconds]
"spark.kryoserializer.buffer.max" = "1500m"
— это произошло потому, что —
org.apache.spark.SparkException: Kryo serialization failed: Buffer overflow. Available: 0, required: 4658915. To avoid this, increase spark.kryoserializer.buffer.max value
Я также столкнулся с этими ошибками —
b) java.lang.OutOfMemoryError: GC overhead limit exceeded
c) java.lang.StackOverflowError
Но в настоящее время я сталкиваюсь с этой проблемой, которая для меня совершенно нова, и я не смог найти решение —
Caused by: org.apache.spark.SparkException: There is no enough memory to build hash map
at org.apache.spark.sql.execution.joins.UnsafeHashedRelation$.apply(HashedRelation.scala:314)
at org.apache.spark.sql.execution.joins.HashedRelation$.apply(HashedRelation.scala:109)
at org.apache.spark.sql.execution.joins.HashedRelationBroadcastMode.transform(HashedRelation.scala:857)
at org.apache.spark.sql.execution.joins.HashedRelationBroadcastMode.transform(HashedRelation.scala:845)
at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$anonfun$relationFuture$1$anonfun$apply$1.apply(BroadcastExchangeExec.scala:89)
at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$anonfun$relationFuture$1$anonfun$apply$1.apply(BroadcastExchangeExec.scala:76)
at org.apache.spark.sql.execution.SQLExecution$anonfun$withExecutionId$1.apply(SQLExecution.scala:141)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:165)
at org.apache.spark.sql.execution.SQLExecution$.withExecutionId(SQLExecution.scala:138)
at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$anonfun$relationFuture$1.apply(BroadcastExchangeExec.scala:75)
at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$anonfun$relationFuture$1.apply(BroadcastExchangeExec.scala:75)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
Thinking it is coming because I am not using any action
until the last step of the execution, I tried to break the lineage by using intermediate action
like count
But the issue still persists, I also tried to increase the driver memory to a great extent, but no luck.
EDITS
This is the first query I was talking about
val a = spark.sql("""
select
/* BROADCAST(schema.table2) */
visit_date,
v4.visid,
visit_num,
device_family,
operating_system,
'test' AS traffic_grouping,
'test2'Junk_Filter,
'test3' visit_start_evar26,
visit_start_pagename,
visit_start_url,
referrer_domain,
session_duration,
isp_domain,
pages_visited,
qbo_sui_click,
qbo_sui_trial_click,
qbo_sui_buy_click,
qbse_sui_click,
qbse_sui_trial_click,
qbse_sui_buy_click,
qbse_customer_flag,
qbo_customer_flag,
case when s.ivid is not null then 1 else 0 end as segment_qbo
from schema.table2 v4
left join schema.table2 s
on v4.ivid = s.ivid and v4.visit_date > s.qbo_signup_date and source = 'check'
WHERE visit_date >= '2020-07-14'
and country in ('Australia','Australia')
and ((ipd_flag = 0
AND sbbg_flag = 0)
or lower(visit_start_evar26) like ('%buy%'))
group by 1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23
""")
here I have done a broadcast join.
If anyone has any leads, it would really be a great help, thanks in advance!
Комментарии:
1. Покажите нам свой код и объясните, что вы делаете или хотите сделать с выводом?
2. Вы говорите о трансляции … трансляции каких данных?
3. У нас есть приложение, и данные связаны с людьми, посещающими наше приложение, такими как отметка времени, местоположение, устройство, с помощью которого они подключились, страницы, которые они посетили, подписались ли они на наш канал и т. Д. И т. Д. Эти данные являются конфиденциальными, иначе я бы поделился, в каждой таблице более 100 столбцов. Что касается части кода, то это очень простое и прямое левое соединение и пара фильтров, мне очень жаль, если этого недостаточно 🙂
4. Код всегда помогает… 😉 Вы пробовали не использовать кучу? какую v-образную искру вы используете?
5. Привет @jgp, добавил первый запрос — в нем была куча случаев(когда тогда), которые я удалил. И я использую spark 2.4