#json #spring #apache-kafka #jackson #marshalling
#json #spring #apache-кафка #джексон #сортировка
Вопрос:
Я пытаюсь отправить сообщение в kafka и получаю это сообщение в другой службе. Я добавляю
org.springframework.kafka.support.serializer.JsonDeserializer //consumer side
org.springframework.kafka.support.serializer.JsonSerializer //produser side
И я пытаюсь отправить объект с полем:
org.w3c.dom.Element;
@XmlAnyElement
protected Element any;
Сообщение отправлено успешно, но на стороне потребителя я получаю ошибку:
Caused by: org.apache.kafka.common.errors.SerializationException: Can't deserialize data [[123, 34, 114, 101...
Caused by: com.fasterxml.jackson.databind.JsonMappingException: Problem deserializing property 'any' (expected type: [simple type, class org.w3c.dom.Element]; actual type: `com.sun.org.apache.xerces.internal.dom.DeferredDocumentImpl`), problem: java.lang.ClassCastException@4db05b26
Я показываю это сообщение в topik с помощью этой команды:
./kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic my-topic --from-beginning
И это сообщение выглядит правильно:
"to": "eyJzaWQi...",
"messagePrimaryContent": {
"any": "<?xml version="1.0" encoding="UTF-16"?>n<ImportCh...(another xml text)"
},
"personalSignature": null,
Как мне десериализовать это сообщение?
Если я отправлю этот объект без org.w3c.dom.Element
, все будет работать правильно.
Ответ №1:
Существует проблема с десериализацией записей в 123, 34, 114, 101 .., из-за неправильной схемы. На стороне потребителя у вас есть другой формат для десериализации. Вы можете избежать этих данных, выполнив некоторый try-catch и избегая записи такого типа, или же измените правильную схему на стороне потребителя.
Код, позволяющий избежать неправильных записей схемы.
try {
records = consumer.poll(10000);
} catch (SerializationException e) {
String s = e.getMessage().split("Error deserializing key/value for partition ")[1].split(". If needed, please seek past the record to continue consumption.")[0];
String topics = s.split("-")[0];
int offset = Integer.valueOf(s.split("offset ")[1]);
int partition = Integer.valueOf(s.split("-")[1].split(" at")[0]);
TopicPartition topicPartition = new TopicPartition(topics, partition);
//log.info("Skipping " topic "-" partition " offset " offset);
consumer.seek(topicPartition, offset 1);
}