Как эффективно читать каждое сообщение в теме Kafka с помощью Spark (используя scala)?

#scala #apache-spark #apache-kafka

#scala #apache-spark #apache-kafka

Вопрос:

У меня есть задача прочитать каждое сообщение в теме Kafka, в которой есть 3 раздела. У меня есть Spark и Scala в качестве инструментов для этого.

Пока у меня есть следующий код:

 val kafkaStream = KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe
      [String, String](cdrTopic, kafkaParams))

kafkaStream.foreachRDD { rdd => {

      if (!rdd.isEmpty()) {

        rdd.foreachPartition(iterator => {

          while (iterator.hasNext) {

            val partition = iterator.next().value().split("\n")

            if (!partition.isEmpty) {

              partition.foreach(string => {

                if (!string.isEmpty) {

                  //process every message here
                }

              })
            }

            println("no partition data")

          }

        })

      }
      println("no rdd data to process")
      }
    }

  

Каждое сообщение представляет собой строку tsv (значения, разделенные табуляцией), которую мне нужно проанализировать и сохранить в БД.

Я считаю, что это очень неэффективно, потому что я использую 4 цикла, и я думаю, что некоторые данные теряются, поскольку количество сообщений, записанных в db, очень мало.

Я использую scala версии 2.11.12, spark-streaming 2.4.0, spark-streaming-kafka 2.4.0.

Есть ли какой-либо эффективный способ чтения и анализа каждого сообщения в kafka с использованием spark?

Комментарии:

1. Привет @Aidai, какую базу данных вы планируете использовать?

2. @AlexandrosBiratsis В настоящее время я вставляю данные в Ignite. Но также планирую внедрить persist в PostgreSQL.