Десериализация Avro пакета Spark: Искаженные данные. Длина отрицательная

#apache-spark #avro #confluent-schema-registry #spark-avro

Вопрос:

Я занимаюсь пакетной обработкой Кафки с помощью Spark. Запись сериализована как Avro. Я пытаюсь десериализовать значение, используя точную схему в самом сообщении, но получаю исключение неправильной записи. Вот мой код:

     Dataset<Row> load = sparkSession
            .read()
            .format("kafka")
            .option("kafka.bootstrap.servers", (String) kafkaConfiguration.consumerProperties().get("bootstrap.servers"))
            .option("subscribe", kafkaConfiguration.topicsAsCSV(","))
            .load();

    var schema = new String(Files.readAllBytes(Paths.get("schema.avsc")));

    load.select(from_avro(col("value"), schema)).write().json("/tmp/spark/json");
 

Обратите внимание, что схема копируется как есть из самого значения записи.

Однако я получаю следующее исключение:

 Caused by: org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -20
    at org.apache.avro.io.BinaryDecoder.doReadBytes(BinaryDecoder.java:336)
    at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263)
    at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:272)
    at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:214)
 

В чем причина этой ошибки и как я могу ее исправить? Спасибо!

Ответ №1:

Мне нужно больше информации. Но у меня возникла аналогичная проблема, когда я попытался использовать сообщения avro от Кафки Конфлуэнта. Он генерирует сообщение с 5 байтами для хранения идентификатора SchemaRegitryID из службы SchemaRegistry и содержимого avro, которое находится в остальной части сообщения.

 # python example to slice bytes
schema_id = message[:5]
content = message[5:]
 

Более подробная информация в https://www.confluent.io/blog/consume-avro-data-from-kafka-topics-and-secured-schema-registry-with-databricks-confluent-cloud-on-azure/#parse-bytes с pyspark вместо Java/Scala (извините).

Комментарии:

1. Я действительно пытался пропустить и первые пять байтов, но безрезультатно :/