Данные Avro не преобразованы в Spark

#apache-spark #apache-kafka #spark-avro

#apache-spark #apache-kafka #spark-avro

Вопрос:

Я записал один из столбцов фрейма данных Spark в Kafka в формате Avro. Затем я пытаюсь прочитать данные из этого раздела и преобразовать из Avro в столбец фрейма данных. Тип данных — это временная метка, и вместо временных меток из базы данных я получаю некоторые значения по умолчанию:

 1970-01-01 00:00:00
1970-01-01 00:00:00
1970-01-01 00:00:00
1970-01-01 00:00:00
1970-01-01 00:00:00
1970-01-01 00:00:00
1970-01-01 00:00:00
1970-01-01 00:00:00
1970-01-01 00:00:00
1970-01-01 00:00:00
  

Такое же поведение можно заметить со столбцами других типов данных, таких как String. Начальное значение временной метки выглядит следующим образом, и это результат, который я хочу получить:

 2019-03-19 12:26:03.003
2019-03-19 12:26:09    
2019-03-19 12:27:04.003
2019-03-19 12:27:08.007
2019-03-19 12:28:01.013
2019-03-19 12:28:05.007
2019-03-19 12:28:09.023       
2019-03-19 12:29:04.003
2019-03-19 12:29:07.047
2019-03-19 12:30:00.003
  

И вот те же данные после преобразования в Avro:

 00 F0 E1 9B BC B3 9C C2 05
00 80 E9 F7 C1 B3 9C C2 05
00 F0 86 B2 F6 B3 9C C2 05
00 B0 E9 9A FA B3 9C C2 05
00 90 A4 E1 AC B4 9C C2 05
00 B0 EA C8 B0 B4 9C C2 05
00 B0 88 B3 B4 B4 9C C2 05
00 F0 BE EA E8 B4 9C C2 05
00 B0 89 DE EB B4 9C C2 05
00 F0 B6 9E 9E B5 9C C2 05
  

Что я могу сделать, чтобы исправить эту проблему с преобразованием?

Код для записи Avro в Kafka, считывания его и обратного преобразования во фрейм данных. Я пытался использовать методы to_avro и from_avro Spark-avro:

 import org.apache.spark.sql.avro._

    val castDF = testDataDF.select(to_avro(testDataDF.col("update_database_time")) as 'value)

    castDF
      .write
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServers)
      .option("topic", "app_state_test")
      .save()

    val cachedDf = spark
      .read
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServers)
      .option("subscribe", "app_state_test")
      .load()

     val jsonSchema = "{"name": "update_database_time", "type": "long",  "logicalType": "timestamp-millis", "default": "NONE"}"
    cachedDf.select(from_avro(cachedDf.col("value"), jsonSchema) as 'test)