Как получить MessageListenerContainer с помощью KafkaListenerEndpointRegistry

#spring #spring-boot #apache-kafka #spring-cloud #kafka-consumer-api

Вопрос:

Я написал код для проверки работоспособности своего пользователя KafkaConsumer вот так:

   @Component
   class HealthCheckConsumer : ReactiveHealthIndicator {
        @Autowired
        private lateinit var kafkaListenerEndpointRegistry: KafkaListenerEndpointRegistry


        override fun health(): Mono<Health> {
         val container =
            kafkaListenerEndpointRegistry.getListenerContainer("myGroupId")
        
        println("get listener container"   container)
        
        return doHealthCheck(container)
            .onErrorResume({ ex -> Mono.just(Health.Builder().down(ex).build()) })
    }

    private fun doHealthCheck( message: MessageListenerContainer): Mono<Health> {
        if (message.isRunning)
            return Mono.just(Health.up().build())
        return Mono.error(Exception("an error was threw"))
      }
    }
 
  1. Но когда я запускаю его, контейнер переменной возвращает значение null, даже несмотря на то, что сообщение кафки было получено, а идентификатор группы соответствует свойствам spring.cloud.stream.bindings.accountAsisConsumer-in-0.group.
  2. В противном случае, когда я пытаюсь получить весь контейнер с помощью kafkaListenerEndpointRegistry.allListenerContainers, он возвращает пустую коллекцию.

Мне интересно, не пропустил ли я какую-то конфигурацию для этого, пожалуйста, помогите мне с этой проблемой. Заранее спасибо.

Ответ №1:

Держатель прослушивателя должен быть вызван после того, как потребитель получил сообщение. Другими словами, вы должны поместить строку ниже или вызвать содержащий ее метод, в котором тело метода является потребителем. kafkaListenerEndpointRegistry.getListenerContainer(«myGroupId»)