#scala #apache-spark #apache-spark-sql #apache-spark-dataset
#скала #apache-искра #apache-искра-sql #apache-spark-dataset #scala #apache-spark #apache-spark-sql
Вопрос:
Я пишу структурированное потоковое приложение Spark, в котором я считываю данные из очереди Kafka и обрабатываю полученные сообщения. Конечный результат, который я хочу, — это DataSet[MyMessage]
(где MyMessage
— пользовательский объект), который я хочу поместить в очередь на другую тему Kafka. Дело в том, что каждое входное сообщение из очереди Kafka-потребителя может выдавать несколько MyMessage
объектов, поэтому преобразование не 1: 1, 1: Много.
Так что я делаю
val messagesDataSet: DataSet[List[MyMessage]] = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "server1")
.option("subscribe", "topic1")
.option("failOnDataLoss", false)
.option("startingOffsets", "offset1")
.load()
.select($"value")
.mapPartitions{r => createMessages(r)}
val createMessages(row: Iterator[Row]): List[MyMessage] = {
// ...
}
Очевидно, messagesDataSet
это DataSet[List[MyMessage]]
. Есть ли способ получить просто DataSet[MyMessage]
?
Или есть способ взять DataSet[List[MyMessage]]
и затем записать каждый MyMessage
объект в другую тему Кафки? (В конце концов, это моя конечная цель)
Ответ №1:
попробуйте
messagesDataSet.flatMap(identity)
Ответ №2:
Вы можете создать несколько значений с помощью mapPartitions (так что это работает аналогично flatMap), но вам нужно вернуть итератор:
def createMessages(row: Iterator[Row]): Iterator[MyMessage] = {
row.map(/*...*/) //you need too return iterator here
}