#apache-spark #avro #confluent-schema-registry #spark-avro
Вопрос:
Я занимаюсь пакетной обработкой Кафки с помощью Spark. Запись сериализована как Avro. Я пытаюсь десериализовать значение, используя точную схему в самом сообщении, но получаю исключение неправильной записи. Вот мой код:
Dataset<Row> load = sparkSession
.read()
.format("kafka")
.option("kafka.bootstrap.servers", (String) kafkaConfiguration.consumerProperties().get("bootstrap.servers"))
.option("subscribe", kafkaConfiguration.topicsAsCSV(","))
.load();
var schema = new String(Files.readAllBytes(Paths.get("schema.avsc")));
load.select(from_avro(col("value"), schema)).write().json("/tmp/spark/json");
Обратите внимание, что схема копируется как есть из самого значения записи.
Однако я получаю следующее исключение:
Caused by: org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -20
at org.apache.avro.io.BinaryDecoder.doReadBytes(BinaryDecoder.java:336)
at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263)
at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:272)
at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:214)
В чем причина этой ошибки и как я могу ее исправить? Спасибо!
Ответ №1:
Мне нужно больше информации. Но у меня возникла аналогичная проблема, когда я попытался использовать сообщения avro от Кафки Конфлуэнта. Он генерирует сообщение с 5 байтами для хранения идентификатора SchemaRegitryID из службы SchemaRegistry и содержимого avro, которое находится в остальной части сообщения.
# python example to slice bytes
schema_id = message[:5]
content = message[5:]
Более подробная информация в https://www.confluent.io/blog/consume-avro-data-from-kafka-topics-and-secured-schema-registry-with-databricks-confluent-cloud-on-azure/#parse-bytes с pyspark вместо Java/Scala (извините).
Комментарии:
1. Я действительно пытался пропустить и первые пять байтов, но безрезультатно :/