Как опубликовать в Kafka из Spark, используя структурированную потоковую передачу?

#apache-spark #apache-kafka #spark-structured-streaming

#apache-spark #apache-kafka #spark-structured-streaming

Вопрос:

Я пишу приложение Spark, которое считывает сообщения из темы Kafka, просматривает записи в базе данных, создает новые сообщения и публикует их в другой теме Kafka. Вот как выглядит мой код —

 val inputMessagesDataSet: DataSet[InputMessage] = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "server1")
  .option("subscribe", "input-kafka-topic1")
  .load()
  .select($"value")
  .mapPartitions{r =>
     val messages: Iterator[InputMessage] = parseMessages(r)
  }

inputMessagesDataSet
  .writeStream
  .foreachBatch(processMessages _)
  .trigger(trigger)
  .start
  .awaitTermination

def processMessages(inputMessageDataSet: Dataset[InputMessage]) = {
   // fetch stuff from DB and build a DataSet[OutputMessage]
   val outputMessagesDataSet: DataSet[OutputMessage] = ...
   // now queue to another kafka topic
  outputMessagesDataSet
      .writeStream
      .trigger(trigger)
      .format("kafka")
      .option("kafka.bootstrap.servers", "server1")
      .option("topic", "output-kafka-topic")
      .option("checkpointLocation", loc)
      .start
      .awaitTermination
}
  

Но я получаю сообщение об ошибке

org.apache.spark.sql.AnalysisException: 'writeStream' can be called only on streaming Dataset/DataFrame; онлайн outputMessagesDataSet.writeStream

Похоже, это связано с тем, что outputMessagesDataSet не создается с помощью readStream . Причина, по которой я не создаю DataSet[OutputMessage] в оригинале mapPartitions() , заключается в том, что классы, необходимые для извлечения записей БД и т.д., Не сериализуемы, поэтому он выдает NotSerializableException .

Как мне создать новый набор данных и поставить его в очередь в Kafka?

Комментарии:

1. почему вы используете foreachBatch?

2. @AlexOtt есть ли альтернатива?

3. просто используйте writeStream : spark.apache.org/docs/latest /…

4. @AlexOtt ах, это имеет смысл

Ответ №1:

foreachBatch принимает статический набор данных, поэтому вам нужно использовать write , а не writeStream

Кроме того, вы можете writeStream.format("kafka") без использования forEachBatch