Как читать данные из раздела кафки с другой схемой (имеет некоторые дополнительные объекты) в структурированной потоковой передаче

#scala #apache-spark #apache-kafka #jsonschema #spark-structured-streaming

Вопрос:

у меня есть данные , поступающие в тему кафки, в которой есть необязательный объект, и, поскольку он необязателен, мне не хватает этих записей при чтении с определенной схемой

экс :

схема у меня есть :

 val schema =new StructType()
      .add("obj_1", new ArrayType(
        new StructType(
          Array(
            StructField("field1",StringType),
            StructField("field2",StringType),
            StructField("obj_2",new ArrayType(
              new StructType(
                Array(
                  StructField("field3",StringType),
                  StructField("field4",LongType),
                  StructField("obj_3",new ArrayType(
                    new StructType(
                      Array(
                        StructField("field5",StringType),
                        StructField("field6",StringType),
                      )
                    ),containsNull = true
                  )
                  )
                )
              ),containsNull = true
            )),
            StructField("field7",StringType),
            StructField("field8",StringType))), containsNull = true))
   
 

при публикации данных в этой теме мы иногда не отправляем obj_3 на основании некоторых условий.

поэтому при чтении темы и сопоставлении ее с приведенной выше схемой нам не хватает данных, которые не будут содержать эти obj_3 и будут содержать только данные с присутствием этого obj_3 .

как прочитать данные, которые когда-нибудь не будут иметь obj_3.

пример кода :

  val df = spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers","bootstrap.servers")
        .option("subscribe", "topic.name")
        .option("startingOffsets", "offset.reset)
        .option("failOnDataLoss","true")
        .load()

      val cast = df.selectExpr( "CAST(value AS STRING)")
        .as[( String)]

val resultedDf = cast.select(from_json(col("value"), schema).as("newDF"))

 val finalDf = resultedDf.select(col("newDF.*"))

 

Ответ №1:

Вы могли бы либо

  • используйте флаг (например, называемый «obj3flag» в структуре JSON) в ключе сообщения Кафки, который сообщает вашему структурированному потоковому заданию, существует ли obj_3 в значении Кафки, а затем выберите либо одну, либо другую схему для анализа строки json. Что-то вроде:
 import org.apache.spark.sql.functions._
val schemaWithObj3 = ...
val schemaWithOutObj3 = ...

val cast = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
        .as[(String, String)]

val resultedDf = cast
  .withColumn("obj_3_flag", get_json_object(col("key"), "$.obj3flag"))
  .withColumn("data", when(col("obj3_flag") === lit(1), from_json(col("value"), schemaWithObj3).otherwise(from_json(col("value"), schemaWithOutObj3))) 
  .select(col("data.*"))
 
  • выполните поиск по строке «obj_3» в значении Кафки (приведенном как строка), и если строка найдена, примените ту или иную схему для анализа json. Код будет выглядеть очень похоже на код для другого варианта.

Пожалуйста, обратите внимание, что я написал код на своем мобильном телефоне, поэтому вы можете обнаружить некоторые проблемы с синтаксисом. Однако, надеюсь, эта идея дойдет до вас.