Десериализатор значений Json Кафки

#serialization #apache-kafka #deserialization #json-deserialization

Вопрос:

Я использую потребителя кафки со следующими свойствами:

 key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.connect.json.JsonDeserializer
 

Кафкапроизводитель(значение.сериализатор=org.apache.кафка.подключение.json.JsonSerializer) помещает записи JSON в тему, и этот потребитель читает из нее, с точки зрения функциональности она работает нормально, но проблема возникает, когда мой производитель отправляет сообщение, не относящееся к JSON (например: пустое сообщение).

В этом случае потребитель падает, и он не будет потреблять, пока это пустое сообщение не будет очищено(я сбросил смещение группы потребителей до последнего).

Есть ли какой-нибудь способ справиться с этим, возможно, используя какое-то свойство или что-то в этом роде

Ответ №1:

Потребительский API не имеет свойств обработки исключений десериализации, подобных потокам Кафки

Вам нужно будет создать свой собственный десериализатор, который обертывает json и обрабатывает любые ошибки

Вы можете найти класс SafeDeserializer в azkarra-commons полезным