org.springframework.обмен сообщениями.конвертер.Исключение MessageConversionException при чтении сообщения от потребителя КАФКИ

#spring-boot #apache-kafka #kafka-consumer-api #spring-kafka

Вопрос:

—>Весенняя загрузка (v2.4.7) Я столкнулся с проблемой при попытке прочитать сообщение от потребителя КАФКИ. У меня есть конфигурация потребителя Кафки, и я получаю следующую ошибку

«com.база.сотрудник.dto.employeeexecution.EmployeeExecutionEBO» —>У меня нет этого в моем проекте. «com.base.person.dtos.personexecution.PersonEBO»—> У меня есть это в моем проекте

 | ERROR | [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] | org.springframework.core.log.LogAccessor.error | Consumer exception
java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer
    at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:145)
    at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:113)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1439)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1136)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition uat-entp-intg-employeeexecution-7 at offset 139475. If needed, please seek past the record to continue consumption.
Caused by: org.springframework.messaging.converter.MessageConversionException: failed to resolve class name. Class not found [com.base.employee.dtos.employeeexecution.EmployeeExecutionEBO]; nested exception is java.lang.ClassNotFoundException: com.base.employee.dtos.employeeexecution.EmployeeExecutionEBO
    at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:139)
    at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:100)
    at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:504)
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1365)
    at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:130)
    at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1596)
    at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1432)
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:684)
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:635)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1303)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1237)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1283)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1174)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1087)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: com.base.employee.dtos.employeeexecution.EmployeeExecutionEBO
 

KafkaConsumerConfig.java

 @Slf4j
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
    @Value("${spring.kafka.offset-reset-policy}")
    private String offsetResetPolicy;
    @Value("${spring.kafka.group-id}")
    private String groupId;
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.max-poll-interval}")
    private Integer maxPollInterval;

    @Value("${spring.kafka.max-poll-records}")
    private Integer maxPollRecords;

    @Value("${spring.kafka.session-timeout}")
    private Integer sessionTimeout;
    @Value("${spring.kafka.trusted-packages}")
    private String trustedPacakges;

    public KafkaConsumerConfig() {}

    @Bean
    public Map < String, Object > consumerConfigs() {
        Map < String, Object > props = new HashMap < > ();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetResetPolicy);
        props.put(JsonDeserializer.TRUSTED_PACKAGES, trustedPacakges);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollInterval);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout); //
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SASL_PLAINTEXT);

        return props;
    }

    @Bean
    public ConsumerFactory < String, PersonEBO > consumerFactory() {
        return new DefaultKafkaConsumerFactory < > (consumerConfigs(), new StringDeserializer(),
            new ErrorHandlingDeserializer < > (new JsonDeserializer < > (PersonEBO.class, false)));
    }

    @Autowired
    @Bean
    public ConcurrentKafkaListenerContainerFactory < String, PersonEBO > kafkaConsumer() {
        ConcurrentKafkaListenerContainerFactory < String, PersonEBO > factory = new ConcurrentKafkaListenerContainerFactory < > ();
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setSyncCommits(true);
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        factory.setAckDiscarded(true);
        factory.setConcurrency(1);
        factory.setErrorHandler(errorHandler());        factory.getContainerProperties().setAuthorizationExceptionRetryInterval(Duration.ofMillis(30000));
        factory.setMessageConverter(new JsonMessageConverter());

        return factory;
    }

    @Bean
    public SeekToCurrentErrorHandler errorHandler() {
        return new SeekToCurrentErrorHandler(deadLetterPublishingRecoverer(), new FixedBackOff(0 L, 2 L));
    }

    @Bean
    public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer() {
        return new DeadLetterPublishingRecoverer(getEventKafkaTemplate(), (record, ex) - > {
            return new TopicPartition(record.topic()   "-DLQ", -1);
        });
    }

    public KafkaOperations < String, PersonEBO > getEventKafkaTemplate() { // producer to DLQ
        return new KafkaTemplate < String, PersonEBO > (producerFactory());

    }

    @Bean
    public ProducerFactory <String, PersonEBO> producerFactory() {
        Map < String, Object > configProperties = new HashMap < String, Object > ();
        configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        configProperties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, DefaultPartitioner.class);
        configProperties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "5000");
        configProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SASL_PLAINTEXT);
        return new DefaultKafkaProducerFactory <String, PersonEBO> (configProperties);
    }

}
 

Также в Consumer.java я уточнил

KafkaConsumerService.java

  @KafkaListener(topics = "${spring.kafka.topic}", groupId = "${spring.kafka.group-id}", properties = {"spring.json.value.default.type=com.base.person.dtos.personexecution.PersonEBO" }, containerFactory = "kafkaListenerContainerFactory")
    public void consumeShipment(ConsumerRecord<String, PersonEBO> record, Acknowledgment acknowledgment,
            @Headers MessageHeaders headers) throws ExecutionException, InterruptedException {}
 

Ответ №1:

Посмотрите JsonSerializer.ADD_TYPE_INFO_HEADERS и установите его false на стороне производителя. Смотрите больше в документах: https://docs.spring.io/spring-kafka/docs/current/reference/html/#serdes-json-config.

Или наоборот: JsonDeserializer.USE_TYPE_INFO_HEADERS false тоже,

Комментарии:

1. JsonDeserializer. Значение USE_TYPE_INFO_HEADERS равно false, это свойства конфигурации потребителя. У нас нет привилегии изменять свойства на стороне производителя

2. Я только что добавил JsonDeserializer. USE_TYPE_INFO_HEADERS в свойствах конфигурации потребителя, и это дает мне ту же ошибку

3. Извините. Это действительно не сработает, так как вы делаете явное new JsonDeserializer < > (PersonEBO.class, false) . Подумайте, чтобы назвать его setUseTypeHeaders(false) . Все, что вы настраиваете из кода, не использует эти свойства конфигурации. Включая то, что упомянуто JsonDeserializer.TRUSTED_PACKAGES в вашем фрагменте кода. Передумайте делать это с помощью соответствующих установщиков в JsonDeserializer экземпляре.

4. Не могу найти врача!! есть ли какой-либо пример того, как я могу реализовать то же самое в приведенной выше конфигурации java

5. Не уверен, что вы видите там препятствие. У тебя есть это new ErrorHandlingDeserializer < > (new JsonDeserializer < > (PersonEBO.class, false))); . Итак, вы извлекаете JsonDeserializer переменную как переменную и вызываете ее соответствующие установщики: setUseTypeHeaders(false) и addTrustedPackages(trustedPacakges) . Или его стиль строителя: new JsonDeserializer < > (PersonEBO.class, false).ignoreTypeHeaders().trustedPackages(trustedPacakges)