Как корректно завершить работу адаптера канала Kafka?

#spring-integration #spring-kafka

#spring-интеграция #spring-kafka

Вопрос:

Я использую a KafkaMessageDrivenChannelAdapter для получения сообщений из темы Kafka. Эти сообщения размещаются на канале и обрабатываются синхронно одним потоком.

Я хотел бы иметь возможность корректно завершить работу системы, что означает, что больше не используются сообщения из темы, а обрабатываемые сообщения должны быть завершены перед завершением работы.

Я могу вызвать stop() метод на канале KafkaMessageDrivenChannelAdapter , но я не уверен, что происходит с потоком, который в данный момент может обрабатывать сообщение. Чтобы исследовать это поведение, я использовал фиктивный компонент, который просто спит некоторое время, чтобы проверить, что произойдет, если я его выключу. Оказывается, что InterruptedException выбрасывается an , что может означать, что последующая обработка может не завершиться нормально.

Каков правильный способ корректного завершения работы адаптера канала Kafka, который гарантирует, что обрабатываемые сообщения смогут завершиться?

Комментарии:

1. Такого рода вопросы предполагают, что с нашей стороны будет использоваться какой-либо тестовый пример или приложение Spring Boot. OTOH, поделитесь, пожалуйста, поделитесь также трассировкой стека. Просматривая KafkaMessageDrivenChannelAdapter код, я не вижу, что прерывает поток. Он просто существует обычно из while (this.active) {

2. Но можете ли вы подтвердить, что вызов stop() предназначен для корректного завершения работы адаптера с описанными мной характеристиками?

3. Таково его намерение. Все работает нормально, пока вы не подтвердите обратное.

4. Я думаю, @ArtemBilan предполагает, что вы используете 2.x; более старая версия 1.x с клиентом kafka 0.8.x.x может работать по-другому.