#scala #apache-spark #apache-kafka
#scala #apache-spark #apache-kafka
Вопрос:
У меня есть задача прочитать каждое сообщение в теме Kafka, в которой есть 3 раздела. У меня есть Spark и Scala в качестве инструментов для этого.
Пока у меня есть следующий код:
val kafkaStream = KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe
[String, String](cdrTopic, kafkaParams))
kafkaStream.foreachRDD { rdd => {
if (!rdd.isEmpty()) {
rdd.foreachPartition(iterator => {
while (iterator.hasNext) {
val partition = iterator.next().value().split("\n")
if (!partition.isEmpty) {
partition.foreach(string => {
if (!string.isEmpty) {
//process every message here
}
})
}
println("no partition data")
}
})
}
println("no rdd data to process")
}
}
Каждое сообщение представляет собой строку tsv (значения, разделенные табуляцией), которую мне нужно проанализировать и сохранить в БД.
Я считаю, что это очень неэффективно, потому что я использую 4 цикла, и я думаю, что некоторые данные теряются, поскольку количество сообщений, записанных в db, очень мало.
Я использую scala версии 2.11.12, spark-streaming 2.4.0, spark-streaming-kafka 2.4.0.
Есть ли какой-либо эффективный способ чтения и анализа каждого сообщения в kafka с использованием spark?
Комментарии:
1. Привет @Aidai, какую базу данных вы планируете использовать?
2. @AlexandrosBiratsis В настоящее время я вставляю данные в Ignite. Но также планирую внедрить persist в PostgreSQL.