#spring #spring-boot #apache-kafka #spring-cloud #kafka-consumer-api
Вопрос:
Я написал код для проверки работоспособности своего пользователя KafkaConsumer вот так:
@Component
class HealthCheckConsumer : ReactiveHealthIndicator {
@Autowired
private lateinit var kafkaListenerEndpointRegistry: KafkaListenerEndpointRegistry
override fun health(): Mono<Health> {
val container =
kafkaListenerEndpointRegistry.getListenerContainer("myGroupId")
println("get listener container" container)
return doHealthCheck(container)
.onErrorResume({ ex -> Mono.just(Health.Builder().down(ex).build()) })
}
private fun doHealthCheck( message: MessageListenerContainer): Mono<Health> {
if (message.isRunning)
return Mono.just(Health.up().build())
return Mono.error(Exception("an error was threw"))
}
}
- Но когда я запускаю его, контейнер переменной возвращает значение null, даже несмотря на то, что сообщение кафки было получено, а идентификатор группы соответствует свойствам spring.cloud.stream.bindings.accountAsisConsumer-in-0.group.
- В противном случае, когда я пытаюсь получить весь контейнер с помощью kafkaListenerEndpointRegistry.allListenerContainers, он возвращает пустую коллекцию.
Мне интересно, не пропустил ли я какую-то конфигурацию для этого, пожалуйста, помогите мне с этой проблемой. Заранее спасибо.
Ответ №1:
Держатель прослушивателя должен быть вызван после того, как потребитель получил сообщение. Другими словами, вы должны поместить строку ниже или вызвать содержащий ее метод, в котором тело метода является потребителем. kafkaListenerEndpointRegistry.getListenerContainer(«myGroupId»)