#apache-kafka #spark-structured-streaming
#apache-kafka #spark-структурированная потоковая передача
Вопрос:
Я читаю строки журнала из темы kafka с помощью spark structured streaming, разделяю поля строк журнала, выполняю некоторые манипуляции с полями и сохраняю их в dataframe с отдельными столбцами для каждого поля. Я хочу записать этот фрейм данных в kafka
Ниже приведен мой пример фрейма данных и writestream для его записи в kafka
val dfStructuredWrite = dfProcessedLogs.select(
dfProcessedLogs("result").getItem("_1").as("col1"),
dfProcessedLogs("result").getItem("_2").as("col2"),
dfProcessedLogs("result").getItem("_17").as("col3"))
dfStructuredWrite
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.start()
Приведенный выше код выдает ошибку ниже
Required attribute 'value' not found
Я полагаю, это потому, что у меня нет моего фрейма данных в формате ключ / значение.Как я могу записать свой существующий фрейм данных в kafka наиболее эффективным способом?
Ответ №1:
Фрейм данных, записываемый в Kafka, должен содержать следующие столбцы в схеме:
- ключ (необязательно) (тип: строковый или двоичный)
- значение (обязательно) (тип: строковый или двоичный)
- тема (необязательно) (тип: строка)
В вашем случае нет value
столбца, и генерируется исключение.
Вы должны изменить его, чтобы добавить хотя бы столбец значений, например:
import org.apache.spark.sql.functions.{concat, lit}
dfStructuredWrite.select(concat($"col1", lit(" "), $"col2", lit(" "), $"col3").alias("value"))
Для получения более подробной информации вы можете проверить: https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#writing-data-to-kafka