Кафка публикует не потребляя в @KafkaListener

#spring-boot #apache-kafka #kafka-consumer-api #spring-kafka

Вопрос:

Я предоставил 2 потребительских магазина

  1. работа с обычными темами
  2. обработка повторных тем

Также 2 метода @KafkaListener для вышеуказанных случаев

Я пытаюсь опубликовать события, вызвавшие сбой/исключение, в новой теме «повторная попытка события», чтобы другим событиям не нужно было ждать, пока не будет устранена ошибка, публикация работает(без ошибок), но она не используется мгновенно в методе @KafkaListener, но эти события действительно используются при перезапуске сервера.

Конфигурация Кафки

      // @Bean
        public ConsumerFactory<String, Message> consumerFactoryMain() {
            Map<String, Object> config = new HashMap<>();
            config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);
            config.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId);
            config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaMaxPollRecords);
            config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    
            return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(),
                    new ErrorHandlingDeserializer2(new JsonDeserializer(Message.class)));
    
        }
    
        @Bean("kafkaListenerContainerFactoryMain")
        public ConcurrentKafkaListenerContainerFactory<String, Message> kafkaListenerContainerFactoryMain() {
            ConcurrentKafkaListenerContainerFactory<String, Message> factory = new ConcurrentKafkaListenerContainerFactory();
            factory.setConsumerFactory(consumerFactoryMain());
            factory.setConcurrency(kafkaListenerConcurrency);
            return factory;
        }
    
        // @Bean
        public ConsumerFactory<String, Message> consumerFactoryRetry() {
            Map<String, Object> config = new HashMap<>();
            config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);
            config.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId);
            config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaMaxPollRecords);
            config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
            return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(),
                    new ErrorHandlingDeserializer2(new JsonDeserializer(Message.class)));
        }
    
        @Bean("kafkaListenerContainerFactoryRetry")
        public ConcurrentKafkaListenerContainerFactory<String, Message> kafkaListenerContainerFactoryRetry() {
            ConcurrentKafkaListenerContainerFactory<String, Message> factory = new ConcurrentKafkaListenerContainerFactory();
            factory.setConsumerFactory(consumerFactoryRetry());
            factory.setConcurrency(kafkaListenerConcurrency);
            return factory;
        }
 

Потребитель Кафки

     @KafkaListener(topics = { "order-updated", "order-created", "order-rejected", "order-shipped", "order-cancelled",
                "order-delivered", "order-packed", "order-accepted", "update", "created", "reject", "shipped",
                "cancel" }, groupId = "group_id",containerFactory = "kafkaListenerContainerFactoryMain")
        public void consumeOmsEvents(@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                @Payload(required = false) Message message) {
            System.out.println("Got NEW Event for topic from kafka: "   topic);
            System.out.println("Received: "   topic   " to: "   message.getTargetUrl());
            try {
                subscriberService.sendWebhook(message);
            } catch (Exception e) {
                if (message.getRetryCount() < retryCount) {
                    kafkaTemplate.send("event-retry", message);
                }
            }
            System.out.println("Success: "   message.getTargetUrl());
        }
    
        @KafkaListener(topics = { "event-retry" }, groupId = "group_id",containerFactory = "kafkaListenerContainerFactoryRetry")
        public void consumeRetryOmsEvents(@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                @Payload(required = false) Message message) {
            System.out.println("xxxxxxxxxxxxxxxxxx Got Retry Event for topic from kafka: xxxxxxxxxxxxxxxxxx"   topic);
            System.out.println("Received Retry: "   topic   " to: "   message.getTargetUrl()   " count =" message.getRetryCount());
            try {
                if (message.getRetryCount() < retryCount) {
                    subscriberService.sendWebhook(message);
                } else {
                    subscriberService.disableSubscriber(message);
                }
            } catch (Exception e) {
                if (message.getRetryCount() < retryCount) {
                    message.setRetryCount(message.getRetryCount()   1);
                    kafkaTemplate.send("event-retry", message);
                }
    
            }
        }

 

Комментарии:

1. Вы не установили auto.offset.reset , поэтому потребитель начнет с самого конца темы.

2. Ведение журнала отладки должно помочь вам понять, что не так.