#spring-kafka
Вопрос:
код :
@KafkaListener(id = "${spring.kafka.consumer.listener-id}", idIsGroup = false, groupId = "${spring.kafka.consumer.group-id}", topics = "#{'${srm.xdp.topics}'.split(',')}", containerFactory = "consumerFactory", errorHandler = "consumerAwareErrorHandler") public void onDealMessage(Listlt;ConsumerRecordlt;String, Stringgt;gt; records, Acknowledgment ack) { MessageListenerContainer messageListenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(listenerId); messageListenerContainer.pause(); if (messageListenerContainer.isContainerPaused()) { System.out.println("container paused!!!"); } for (ConsumerRecordlt;String, Stringgt; record : records) { System.out.println(record); // business login dealwith } if (!messageListenerContainer.isContainerPaused()) { System.out.println("container not paused!!!"); } ack.acknowledge(); messageListenerContainer.resume(); }
Комментарии:
1. Контейнер приостановится, когда будут обработаны текущие записи, возвращенные из опроса. Приостановка и возобновление работы в одном и том же пакете не будут иметь никакого эффекта, так как вы приостанавливаете и немедленно возобновляете работу. Возможно, вы сможете точно объяснить, что вы пытаетесь сделать.