#scala #apache-spark #apache-kafka #spark-kafka-integration
Вопрос:
Я использую Spark 3.0.1 с кафкой для агрегирования данных 12 5 000 000 пользователей. Скорость записи составляет около 200000 в секунду. Режим вывода — «Добавить», а база данных-Mongo. Spark считывает данные из кафки и записывает результат в Монго. Однако приложение тратит много времени на GC. Почему приложение использовало так много памяти? Я хочу знать, есть ли какое-либо решение для своевременного освобождения памяти или моя конфигурация неверна?
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafkaConn)
.option("subscribe", "user_topic")
.load()
val query = df
.selectExpr("CAST(key AS STRING) as key", "CAST(value AS STRING) as value")
.as[(String, String)]
.select(from_json($"value", jsonSchema).as("data"))
.selectExpr(
"data.AgentID", "data.subscriber", "data.app", "data.octCount","data.Time"
).selectExpr("cast(Time/1000 as TIMESTAMP) as Time", "subscriber", "app", "FD", "octCount")
.withWatermark("Time", "1 minutes")
.groupBy(window($"Time", "15 minutes", "15 minutes"), $"subscriber", $"app")
.agg(sum(col("octCount")) as "oct_count")
.selectExpr("cast(window.start as LONG)*1000 as time", "subscriber", "app", "oct_count")
.writeStream
.outputMode("append")
Рабочий параметр-это:
--num-executors 2 --executor-cores 8 --executor-memory 26G --conf spark.default.parallelism=16 --conf spark.sql.shuffle.partitions=16 --conf spark.memory.fraction=0.1 --conf "spark.executor.extraJavaOptions=-XX: UseG1GC -XX: PrintFlagsFinal -XX: PrintReferenceGC -verbose:gc -XX: PrintGCDetails -XX: PrintGCTimeStamps -XX: PrintAdaptiveSizePolicy -XX: UnlockDiagnosticVMOptions -XX: G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -XX:ConcGCThreads=6
Комментарии:
1. databricks.com/blog/2015/05/28/… , вы видели это руководство ? Для дальнейшей настройки вам потребуется предоставить файлы gc.log и поискать в них интересные шаблоны или сообщения