org.apache.kafka.common.errors.Исключение сериализации с полем org.w3c.dom.Element

#json #spring #apache-kafka #jackson #marshalling

#json #spring #apache-кафка #джексон #сортировка

Вопрос:

Я пытаюсь отправить сообщение в kafka и получаю это сообщение в другой службе. Я добавляю

 org.springframework.kafka.support.serializer.JsonDeserializer //consumer side
org.springframework.kafka.support.serializer.JsonSerializer //produser side
  

И я пытаюсь отправить объект с полем:

 org.w3c.dom.Element;

@XmlAnyElement
protected Element any;
  

Сообщение отправлено успешно, но на стороне потребителя я получаю ошибку:

 Caused by: org.apache.kafka.common.errors.SerializationException: Can't deserialize data [[123, 34, 114, 101...

Caused by: com.fasterxml.jackson.databind.JsonMappingException: Problem deserializing property 'any' (expected type: [simple type, class org.w3c.dom.Element]; actual type: `com.sun.org.apache.xerces.internal.dom.DeferredDocumentImpl`), problem: java.lang.ClassCastException@4db05b26
  

Я показываю это сообщение в topik с помощью этой команды:

 ./kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic my-topic  --from-beginning
  

И это сообщение выглядит правильно:

 "to": "eyJzaWQi...",
 "messagePrimaryContent": {
    "any": "<?xml version="1.0" encoding="UTF-16"?>n<ImportCh...(another xml text)"
 },
 "personalSignature": null,
  

Как мне десериализовать это сообщение?

Если я отправлю этот объект без org.w3c.dom.Element , все будет работать правильно.

Ответ №1:

Существует проблема с десериализацией записей в 123, 34, 114, 101 .., из-за неправильной схемы. На стороне потребителя у вас есть другой формат для десериализации. Вы можете избежать этих данных, выполнив некоторый try-catch и избегая записи такого типа, или же измените правильную схему на стороне потребителя.

Код, позволяющий избежать неправильных записей схемы.

 try {
                    records = consumer.poll(10000);
                } catch (SerializationException e) {
                    String s = e.getMessage().split("Error deserializing key/value for partition ")[1].split(". If needed, please seek past the record to continue consumption.")[0];
                    String topics = s.split("-")[0];
                    int offset = Integer.valueOf(s.split("offset ")[1]);
                    int partition = Integer.valueOf(s.split("-")[1].split(" at")[0]);

                    TopicPartition topicPartition = new TopicPartition(topics, partition);
                    //log.info("Skipping "   topic   "-"   partition   " offset "   offset);
                    consumer.seek(topicPartition, offset   1);
                }