#spring-boot #apache-kafka #kafka-consumer-api #spring-kafka
Вопрос:
Я предоставил 2 потребительских магазина
- работа с обычными темами
- обработка повторных тем
Также 2 метода @KafkaListener для вышеуказанных случаев
Я пытаюсь опубликовать события, вызвавшие сбой/исключение, в новой теме «повторная попытка события», чтобы другим событиям не нужно было ждать, пока не будет устранена ошибка, публикация работает(без ошибок), но она не используется мгновенно в методе @KafkaListener, но эти события действительно используются при перезапуске сервера.
Конфигурация Кафки
// @Bean
public ConsumerFactory<String, Message> consumerFactoryMain() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);
config.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId);
config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaMaxPollRecords);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(),
new ErrorHandlingDeserializer2(new JsonDeserializer(Message.class)));
}
@Bean("kafkaListenerContainerFactoryMain")
public ConcurrentKafkaListenerContainerFactory<String, Message> kafkaListenerContainerFactoryMain() {
ConcurrentKafkaListenerContainerFactory<String, Message> factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactoryMain());
factory.setConcurrency(kafkaListenerConcurrency);
return factory;
}
// @Bean
public ConsumerFactory<String, Message> consumerFactoryRetry() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);
config.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId);
config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaMaxPollRecords);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(),
new ErrorHandlingDeserializer2(new JsonDeserializer(Message.class)));
}
@Bean("kafkaListenerContainerFactoryRetry")
public ConcurrentKafkaListenerContainerFactory<String, Message> kafkaListenerContainerFactoryRetry() {
ConcurrentKafkaListenerContainerFactory<String, Message> factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactoryRetry());
factory.setConcurrency(kafkaListenerConcurrency);
return factory;
}
Потребитель Кафки
@KafkaListener(topics = { "order-updated", "order-created", "order-rejected", "order-shipped", "order-cancelled",
"order-delivered", "order-packed", "order-accepted", "update", "created", "reject", "shipped",
"cancel" }, groupId = "group_id",containerFactory = "kafkaListenerContainerFactoryMain")
public void consumeOmsEvents(@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Payload(required = false) Message message) {
System.out.println("Got NEW Event for topic from kafka: " topic);
System.out.println("Received: " topic " to: " message.getTargetUrl());
try {
subscriberService.sendWebhook(message);
} catch (Exception e) {
if (message.getRetryCount() < retryCount) {
kafkaTemplate.send("event-retry", message);
}
}
System.out.println("Success: " message.getTargetUrl());
}
@KafkaListener(topics = { "event-retry" }, groupId = "group_id",containerFactory = "kafkaListenerContainerFactoryRetry")
public void consumeRetryOmsEvents(@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Payload(required = false) Message message) {
System.out.println("xxxxxxxxxxxxxxxxxx Got Retry Event for topic from kafka: xxxxxxxxxxxxxxxxxx" topic);
System.out.println("Received Retry: " topic " to: " message.getTargetUrl() " count =" message.getRetryCount());
try {
if (message.getRetryCount() < retryCount) {
subscriberService.sendWebhook(message);
} else {
subscriberService.disableSubscriber(message);
}
} catch (Exception e) {
if (message.getRetryCount() < retryCount) {
message.setRetryCount(message.getRetryCount() 1);
kafkaTemplate.send("event-retry", message);
}
}
}
Комментарии:
1. Вы не установили
auto.offset.reset
, поэтому потребитель начнет с самого конца темы.2. Ведение журнала отладки должно помочь вам понять, что не так.