#apache-spark #apache-kafka #apache-spark-sql #spark-structured-streaming
#apache-spark #apache-kafka #apache-spark-sql #spark-structured-streaming
Вопрос:
Я использую spark structured streaming для использования темы kafka, которая имеет несколько типов сообщений (разные схемы каждого типа). Я определяю схему, в которой есть все поля для разных типов сообщений.
Как я могу фильтровать пустые поля из фрейма данных для каждой строки или как я могу прочитать фрейм данных из kafka с помощью динамической схемы.
val inputDS = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "overview")
.load()
val schemaa: StructType = StructType(
Array(
StructField("title", StringType, true),
StructField("url", StringType, true),
StructField("content", StringType, true),
StructField("collect_time", StringType, true),
StructField("time", StringType, true),
StructField("user_head", StringType, true),
StructField("image", StringType, true)
)
)
inputDS.withColumn("value", from_json($"value".cast(StringType), schemaa))
//.filter() // todo filter empty field
.writeStream
.format("console")
.start()
.awaitTermination()
Комментарии:
1. Вы не можете удалить отдельные столбцы из строк. Возможно, прочитайте его из Kafka в RDD, определите тип сообщения и переместите его в фрейм данных правильной схемы.
2. @Andrew большое спасибо. Я уже включил чтение из Kafka в RDD, и я хочу обновить его с spark streaming до spark structured streaming.