#scala #apache-spark #cassandra #apache-spark-sql #spark-streaming
#scala #apache-spark #cassandra #apache-spark-sql #потоковая передача spark
Вопрос:
Я пытаюсь определить статус завершения на различных уровнях детализации. Например, регион является «завершенным», если все города в регионе завершены.
Я поддерживаю состояние на самом низком уровне (town) в памяти, используя следующий подход в Spark:
Шаг 1. Загрузите начальное состояние из таблицы Cassandra во фрейм данных Spark.
---------- -------- -------- ------------
| country | region | town | isComplete |
---------- -------- -------- ------------
| Country1 | State1 | Town1 | FALSE |
| Country1 | State1 | Town2 | FALSE |
| Country1 | State1 | Town3 | FALSE |
| Country1 | State1 | Town4 | FALSE |
| Country1 | State1 | Town5 | FALSE |
| Country1 | State1 | Town6 | FALSE |
| Country1 | State1 | Town7 | FALSE |
| Country1 | State1 | Town8 | FALSE |
| Country1 | State1 | Town9 | FALSE |
| Country1 | State1 | Town10 | FALSE |
| Country1 | State1 | Town11 | FALSE |
---------- -------- -------- ------------
Шаг 2. Запустите потоковую обработку и, используя фреймы данных, созданные в каждом микропакете, попробуйте обновить состояние фрейма данных с шага 1, используя левое внешнее соединение.
Пакет1:
---------- -------- ------- ------------
| country | region | town | isComplete |
---------- -------- ------- ------------
| Country1 | State1 | Town1 | TRUE |
| Country1 | State1 | Town2 | TRUE |
| Country1 | State1 | Town3 | TRUE |
| Country1 | State1 | Town4 | TRUE |
---------- -------- ------- ------------
После применения пакета 1 :
---------- -------- -------- ------------
| country | state | town | isComplete |
---------- -------- -------- ------------
| Country1 | State1 | Town1 | TRUE |
| Country1 | State1 | Town2 | TRUE |
| Country1 | State1 | Town3 | TRUE |
| Country1 | State1 | Town4 | TRUE |
| Country1 | State1 | Town5 | FALSE |
| Country1 | State1 | Town6 | FALSE |
| Country1 | State1 | Town7 | FALSE |
| Country1 | State1 | Town8 | FALSE |
| Country1 | State1 | Town9 | FALSE |
| Country1 | State1 | Town10 | FALSE |
| Country1 | State1 | Town11 | FALSE |
---------- -------- -------- ------------
Моя идея заключается в том, что, сохраняя изменяемый фрейм данных, я мог бы обновлять его в каждом пакете и поддерживать общее состояние (например, глобальную переменную) на протяжении всего срока службы потокового задания.
Базовый набор данных составляет около 1,2 миллиона записей (приблизительно 100 МБ) и, как ожидается, будет масштабироваться до 10 ГБ.
Я сталкиваюсь с проблемами нехватки памяти. Каждый пакет занимает значительно больше времени обработки, чем предыдущий пакет. Также количество этапов для одного и того же задания увеличивается в разных пакетах. В конечном итоге приложение завершается сбоем из-за превышения предела накладных расходов GC.
var statusDf = loadStatusFromCassandra(sparkSession)
ipStream.foreachRDD { statusMsgRDD =>
if (!statusMsgRDD.isEmpty) {
// 1. Create data-frame from the current micro-batch RDD
val messageDf = getMessageDf(sparkSession, statusMsgRDD)
// 2. To update, Left outer join statusDf with messageDf
statusDf = updateStatusDf(sparkSession, statusDf, messageDf)
// 3. Use updated statusDf to generate aggregations at higher levels
// and publish to a Kafka topic
// if a higher level (eg. region) is completed.
}
}
Комментарии:
1. Вы нашли ответ?
2. Нет, у меня нет. Честно говоря, я мало что осмотрел. Мы вернулись к выполнению результатов без сохранения состояния после обработки каждого микропакета в Cassandra.