#spring-boot #apache-kafka #kafka-consumer-api #spring-kafka
Вопрос:
—>Весенняя загрузка (v2.4.7) Я столкнулся с проблемой при попытке прочитать сообщение от потребителя КАФКИ. У меня есть конфигурация потребителя Кафки, и я получаю следующую ошибку
«com.база.сотрудник.dto.employeeexecution.EmployeeExecutionEBO» —>У меня нет этого в моем проекте. «com.base.person.dtos.personexecution.PersonEBO»—> У меня есть это в моем проекте
| ERROR | [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] | org.springframework.core.log.LogAccessor.error | Consumer exception
java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer
at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:145)
at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:113)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1439)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1136)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition uat-entp-intg-employeeexecution-7 at offset 139475. If needed, please seek past the record to continue consumption.
Caused by: org.springframework.messaging.converter.MessageConversionException: failed to resolve class name. Class not found [com.base.employee.dtos.employeeexecution.EmployeeExecutionEBO]; nested exception is java.lang.ClassNotFoundException: com.base.employee.dtos.employeeexecution.EmployeeExecutionEBO
at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:139)
at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:100)
at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:504)
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1365)
at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:130)
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1596)
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1432)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:684)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:635)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1303)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1237)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1283)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1174)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1087)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: com.base.employee.dtos.employeeexecution.EmployeeExecutionEBO
KafkaConsumerConfig.java
@Slf4j
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Value("${spring.kafka.offset-reset-policy}")
private String offsetResetPolicy;
@Value("${spring.kafka.group-id}")
private String groupId;
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.max-poll-interval}")
private Integer maxPollInterval;
@Value("${spring.kafka.max-poll-records}")
private Integer maxPollRecords;
@Value("${spring.kafka.session-timeout}")
private Integer sessionTimeout;
@Value("${spring.kafka.trusted-packages}")
private String trustedPacakges;
public KafkaConsumerConfig() {}
@Bean
public Map < String, Object > consumerConfigs() {
Map < String, Object > props = new HashMap < > ();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetResetPolicy);
props.put(JsonDeserializer.TRUSTED_PACKAGES, trustedPacakges);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollInterval);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout); //
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SASL_PLAINTEXT);
return props;
}
@Bean
public ConsumerFactory < String, PersonEBO > consumerFactory() {
return new DefaultKafkaConsumerFactory < > (consumerConfigs(), new StringDeserializer(),
new ErrorHandlingDeserializer < > (new JsonDeserializer < > (PersonEBO.class, false)));
}
@Autowired
@Bean
public ConcurrentKafkaListenerContainerFactory < String, PersonEBO > kafkaConsumer() {
ConcurrentKafkaListenerContainerFactory < String, PersonEBO > factory = new ConcurrentKafkaListenerContainerFactory < > ();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setSyncCommits(true);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.setAckDiscarded(true);
factory.setConcurrency(1);
factory.setErrorHandler(errorHandler()); factory.getContainerProperties().setAuthorizationExceptionRetryInterval(Duration.ofMillis(30000));
factory.setMessageConverter(new JsonMessageConverter());
return factory;
}
@Bean
public SeekToCurrentErrorHandler errorHandler() {
return new SeekToCurrentErrorHandler(deadLetterPublishingRecoverer(), new FixedBackOff(0 L, 2 L));
}
@Bean
public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer() {
return new DeadLetterPublishingRecoverer(getEventKafkaTemplate(), (record, ex) - > {
return new TopicPartition(record.topic() "-DLQ", -1);
});
}
public KafkaOperations < String, PersonEBO > getEventKafkaTemplate() { // producer to DLQ
return new KafkaTemplate < String, PersonEBO > (producerFactory());
}
@Bean
public ProducerFactory <String, PersonEBO> producerFactory() {
Map < String, Object > configProperties = new HashMap < String, Object > ();
configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
configProperties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, DefaultPartitioner.class);
configProperties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "5000");
configProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SASL_PLAINTEXT);
return new DefaultKafkaProducerFactory <String, PersonEBO> (configProperties);
}
}
Также в Consumer.java я уточнил
KafkaConsumerService.java
@KafkaListener(topics = "${spring.kafka.topic}", groupId = "${spring.kafka.group-id}", properties = {"spring.json.value.default.type=com.base.person.dtos.personexecution.PersonEBO" }, containerFactory = "kafkaListenerContainerFactory")
public void consumeShipment(ConsumerRecord<String, PersonEBO> record, Acknowledgment acknowledgment,
@Headers MessageHeaders headers) throws ExecutionException, InterruptedException {}
Ответ №1:
Посмотрите JsonSerializer.ADD_TYPE_INFO_HEADERS
и установите его false
на стороне производителя. Смотрите больше в документах: https://docs.spring.io/spring-kafka/docs/current/reference/html/#serdes-json-config.
Или наоборот: JsonDeserializer.USE_TYPE_INFO_HEADERS
false
тоже,
Комментарии:
1. JsonDeserializer. Значение USE_TYPE_INFO_HEADERS равно false, это свойства конфигурации потребителя. У нас нет привилегии изменять свойства на стороне производителя
2. Я только что добавил JsonDeserializer. USE_TYPE_INFO_HEADERS в свойствах конфигурации потребителя, и это дает мне ту же ошибку
3. Извините. Это действительно не сработает, так как вы делаете явное
new JsonDeserializer < > (PersonEBO.class, false)
. Подумайте, чтобы назвать егоsetUseTypeHeaders(false)
. Все, что вы настраиваете из кода, не использует эти свойства конфигурации. Включая то, что упомянутоJsonDeserializer.TRUSTED_PACKAGES
в вашем фрагменте кода. Передумайте делать это с помощью соответствующих установщиков вJsonDeserializer
экземпляре.4. Не могу найти врача!! есть ли какой-либо пример того, как я могу реализовать то же самое в приведенной выше конфигурации java
5. Не уверен, что вы видите там препятствие. У тебя есть это
new ErrorHandlingDeserializer < > (new JsonDeserializer < > (PersonEBO.class, false)));
. Итак, вы извлекаетеJsonDeserializer
переменную как переменную и вызываете ее соответствующие установщики:setUseTypeHeaders(false)
иaddTrustedPackages(trustedPacakges)
. Или его стиль строителя:new JsonDeserializer < > (PersonEBO.class, false).ignoreTypeHeaders().trustedPackages(trustedPacakges)