Запись потокового фрейма данных в kafka

#apache-kafka #spark-structured-streaming

#apache-kafka #spark-структурированная потоковая передача

Вопрос:

Я читаю строки журнала из темы kafka с помощью spark structured streaming, разделяю поля строк журнала, выполняю некоторые манипуляции с полями и сохраняю их в dataframe с отдельными столбцами для каждого поля. Я хочу записать этот фрейм данных в kafka

Ниже приведен мой пример фрейма данных и writestream для его записи в kafka

  val dfStructuredWrite = dfProcessedLogs.select(
    dfProcessedLogs("result").getItem("_1").as("col1"),
    dfProcessedLogs("result").getItem("_2").as("col2"),
    dfProcessedLogs("result").getItem("_17").as("col3"))

dfStructuredWrite
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.start()
  

Приведенный выше код выдает ошибку ниже

 Required attribute 'value' not found
  

Я полагаю, это потому, что у меня нет моего фрейма данных в формате ключ / значение.Как я могу записать свой существующий фрейм данных в kafka наиболее эффективным способом?

Ответ №1:

Фрейм данных, записываемый в Kafka, должен содержать следующие столбцы в схеме:

  • ключ (необязательно) (тип: строковый или двоичный)
  • значение (обязательно) (тип: строковый или двоичный)
  • тема (необязательно) (тип: строка)

В вашем случае нет value столбца, и генерируется исключение.

Вы должны изменить его, чтобы добавить хотя бы столбец значений, например:

 import org.apache.spark.sql.functions.{concat, lit}

dfStructuredWrite.select(concat($"col1", lit(" "), $"col2", lit(" "), $"col3").alias("value"))
  

Для получения более подробной информации вы можете проверить: https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#writing-data-to-kafka