Как закодировать структуры в запись Avro в Spark?

#apache-spark #spark-avro

#apache-spark #spark-avro

Вопрос:

Я пытаюсь использовать to_avro() функцию для создания записей Avro. Однако я не могу закодировать несколько столбцов, так как некоторые столбцы просто теряются после кодирования. Простой пример для воссоздания проблемы:

 val schema = StructType(List(
  StructField("entity_type", StringType),
  StructField("entity", StringType)
))
val rdd = sc.parallelize(Seq(
  Row("PERSON", "John Doe")
))
val df = sqlContext.createDataFrame(rdd, schema)

df
  .withColumn("struct", struct(col("entity_type"), col("entity")))
  .select("struct")
  .collect()
  .foreach(println)

// prints [[PERSON, John Doe]]
 
 df
  .withColumn("struct", struct(col("entity_type"), col("entity")))
  .select(to_avro(col("struct")).as("value"))
  .select(from_avro(col("value"), entitySchema).as("entity"))
  .collect()
  .foreach(println)

// prints [[, PERSON]]
 

Моя схема выглядит так

 {
  "type" : "record",
  "name" : "Entity",
  "fields" : [ {
    "name" : "entity_type",
    "type" : "string"
  },
  {
    "name" : "entity",
    "type" : "string"
  } ]
}
 

Что интересно, если я изменю порядок столбцов в структуре, результат будет [, John Doe]

Я использую Spark 2.4.5. Согласно документации Spark: «to_avro() может использоваться для превращения структур в записи Avro. Этот метод особенно полезен, когда вы хотите перекодировать несколько столбцов в один при записи данных в Kafka. »

Ответ №1:

Он работает после изменения типов полей с "string" на ["string", "null"] . Не уверен, предназначено ли это поведение.