#apache-spark #apache-kafka #spark-structured-streaming
#apache-spark #apache-kafka #spark-structured-streaming
Вопрос:
Я пишу приложение Spark, которое считывает сообщения из темы Kafka, просматривает записи в базе данных, создает новые сообщения и публикует их в другой теме Kafka. Вот как выглядит мой код —
val inputMessagesDataSet: DataSet[InputMessage] = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "server1")
.option("subscribe", "input-kafka-topic1")
.load()
.select($"value")
.mapPartitions{r =>
val messages: Iterator[InputMessage] = parseMessages(r)
}
inputMessagesDataSet
.writeStream
.foreachBatch(processMessages _)
.trigger(trigger)
.start
.awaitTermination
def processMessages(inputMessageDataSet: Dataset[InputMessage]) = {
// fetch stuff from DB and build a DataSet[OutputMessage]
val outputMessagesDataSet: DataSet[OutputMessage] = ...
// now queue to another kafka topic
outputMessagesDataSet
.writeStream
.trigger(trigger)
.format("kafka")
.option("kafka.bootstrap.servers", "server1")
.option("topic", "output-kafka-topic")
.option("checkpointLocation", loc)
.start
.awaitTermination
}
Но я получаю сообщение об ошибке
org.apache.spark.sql.AnalysisException: 'writeStream' can be called only on streaming Dataset/DataFrame;
онлайн outputMessagesDataSet.writeStream
Похоже, это связано с тем, что outputMessagesDataSet
не создается с помощью readStream
. Причина, по которой я не создаю DataSet[OutputMessage]
в оригинале mapPartitions()
, заключается в том, что классы, необходимые для извлечения записей БД и т.д., Не сериализуемы, поэтому он выдает NotSerializableException
.
Как мне создать новый набор данных и поставить его в очередь в Kafka?
Комментарии:
1. почему вы используете foreachBatch?
2. @AlexOtt есть ли альтернатива?
3. просто используйте
writeStream
: spark.apache.org/docs/latest /…4. @AlexOtt ах, это имеет смысл
Ответ №1:
foreachBatch
принимает статический набор данных, поэтому вам нужно использовать write
, а не writeStream
Кроме того, вы можете writeStream.format("kafka")
без использования forEachBatch