Как создать набор данных Spark, когда преобразование выполняется не 1:1, а 1:много

#scala #apache-spark #apache-spark-sql #apache-spark-dataset

#скала #apache-искра #apache-искра-sql #apache-spark-dataset #scala #apache-spark #apache-spark-sql

Вопрос:

Я пишу структурированное потоковое приложение Spark, в котором я считываю данные из очереди Kafka и обрабатываю полученные сообщения. Конечный результат, который я хочу, — это DataSet[MyMessage] (где MyMessage — пользовательский объект), который я хочу поместить в очередь на другую тему Kafka. Дело в том, что каждое входное сообщение из очереди Kafka-потребителя может выдавать несколько MyMessage объектов, поэтому преобразование не 1: 1, 1: Много.

Так что я делаю

 val messagesDataSet: DataSet[List[MyMessage]] = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "server1")
      .option("subscribe", "topic1")
      .option("failOnDataLoss", false)
      .option("startingOffsets", "offset1")
      .load()
      .select($"value")
      .mapPartitions{r => createMessages(r)}

val createMessages(row: Iterator[Row]): List[MyMessage] = {
   // ...
}
  

Очевидно, messagesDataSet это DataSet[List[MyMessage]] . Есть ли способ получить просто DataSet[MyMessage] ?

Или есть способ взять DataSet[List[MyMessage]] и затем записать каждый MyMessage объект в другую тему Кафки? (В конце концов, это моя конечная цель)

Ответ №1:

попробуйте

 messagesDataSet.flatMap(identity)
  

Ответ №2:

Вы можете создать несколько значений с помощью mapPartitions (так что это работает аналогично flatMap), но вам нужно вернуть итератор:

   def createMessages(row: Iterator[Row]): Iterator[MyMessage] = {
    row.map(/*...*/) //you need too return iterator here
  }