#scala #apache-spark #apache-kafka #jsonschema #spark-structured-streaming
Вопрос:
у меня есть данные , поступающие в тему кафки, в которой есть необязательный объект, и, поскольку он необязателен, мне не хватает этих записей при чтении с определенной схемой
экс :
схема у меня есть :
val schema =new StructType()
.add("obj_1", new ArrayType(
new StructType(
Array(
StructField("field1",StringType),
StructField("field2",StringType),
StructField("obj_2",new ArrayType(
new StructType(
Array(
StructField("field3",StringType),
StructField("field4",LongType),
StructField("obj_3",new ArrayType(
new StructType(
Array(
StructField("field5",StringType),
StructField("field6",StringType),
)
),containsNull = true
)
)
)
),containsNull = true
)),
StructField("field7",StringType),
StructField("field8",StringType))), containsNull = true))
при публикации данных в этой теме мы иногда не отправляем obj_3 на основании некоторых условий.
поэтому при чтении темы и сопоставлении ее с приведенной выше схемой нам не хватает данных, которые не будут содержать эти obj_3 и будут содержать только данные с присутствием этого obj_3 .
как прочитать данные, которые когда-нибудь не будут иметь obj_3.
пример кода :
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers","bootstrap.servers")
.option("subscribe", "topic.name")
.option("startingOffsets", "offset.reset)
.option("failOnDataLoss","true")
.load()
val cast = df.selectExpr( "CAST(value AS STRING)")
.as[( String)]
val resultedDf = cast.select(from_json(col("value"), schema).as("newDF"))
val finalDf = resultedDf.select(col("newDF.*"))
Ответ №1:
Вы могли бы либо
- используйте флаг (например, называемый «obj3flag» в структуре JSON) в ключе сообщения Кафки, который сообщает вашему структурированному потоковому заданию, существует ли obj_3 в значении Кафки, а затем выберите либо одну, либо другую схему для анализа строки json. Что-то вроде:
import org.apache.spark.sql.functions._
val schemaWithObj3 = ...
val schemaWithOutObj3 = ...
val cast = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
val resultedDf = cast
.withColumn("obj_3_flag", get_json_object(col("key"), "$.obj3flag"))
.withColumn("data", when(col("obj3_flag") === lit(1), from_json(col("value"), schemaWithObj3).otherwise(from_json(col("value"), schemaWithOutObj3)))
.select(col("data.*"))
- выполните поиск по строке «obj_3» в значении Кафки (приведенном как строка), и если строка найдена, примените ту или иную схему для анализа json. Код будет выглядеть очень похоже на код для другого варианта.
Пожалуйста, обратите внимание, что я написал код на своем мобильном телефоне, поэтому вы можете обнаружить некоторые проблемы с синтаксисом. Однако, надеюсь, эта идея дойдет до вас.