KafkaMessageListenerContainer.stop() не останавливает прием сообщений в прослушивателе сообщений

#apache-kafka #kafka-consumer-api #spring-kafka

#apache-kafka #kafka-consumer-api #spring-kafka

Вопрос:

Пример использования: учитывая тему со 100 сообщениями в теме kafka, я хочу прочитать сообщение со смещением 10 до смещения 20. Я мог бы извлечь данные из начального смещения. когда я достигаю конечного смещения, я написал код для остановки контейнера.Даже после выполнения кода потребитель может использовать дополнительные сообщения (со смещения 21). Он останавливается только после чтения всех сообщений в теме

 @Service
public class Consumer1  implements MessageListener<String, GenericRecord> {

 @Override
  public void onMessage(ConsumerRecord<String, GenericRecord> data) {
    log.info("feed record {}", data);
    if (data.offset() == 20) {
      feedService.stopConsumer();
    }
  }
}

@Service
public class FeedService{

    // start logic here

   public void stopConsumer() {
    kafkaMessageListenerContainer.stop();
  }

}
 

Примечание: я использую последнюю версию spring-kafka (2.6.4). Одно из наблюдений заключается в том, что выполняется метод остановки контейнера, но потребитель не закрывается.И никаких ошибок на выходе

Ответ №1:

stop() Не завершает пакетный цикл текущих записей:

 while (isRunning()) {
            try {
                pollAndInvoke();
            }
            catch (@SuppressWarnings(UNUSED) WakeupException e) {
                // Ignore, we're stopping or applying immediate foreign acks
            }
 

Это pollAndInvoke() вызывает a KafkaConsumer.poll() , получает некоторую коллекцию записей и вызывает вашу onMessage() для каждой записи. В какой-то момент вы решаете вызвать остановку, но это не значит, что мы действительно находимся в конце этого списка записей, чтобы немедленно выйти.

Мы действительно останавливаемся на следующем цикле, когда это isRunning() уже возвращается false для нас.

Комментарии:

1. Будет ли пользователь удален при последнем сообщении в теме ? или он будет завершен после выборки нескольких записей (в пакете) после вызова stop() с определенным смещением?

2. Прямо сейчас он завершается, когда обрабатывается весь итератор записей. Как я уже сказал: вы останавливаетесь в середине пакета, но мы не проверяем это состояние до следующего цикла опроса. Это не означает, что последнее обработанное сообщение действительно является последним в теме. Вероятно, вы можете подумать о том, чтобы контролировать количество записей, которые вы извлекаете, в зависимости от конкретных потребительских параметров Kafka.

3. Вероятно, нам следует добавить параметр конфигурации для немедленной остановки (после обработки текущей записи). Пожалуйста, откройте проблему с GitHub.

4. @ArtemBilan можем ли мы настроить потребительское свойство max.poll.records. так что нам не нужно обрабатывать все записи в теме, но оставшиеся записи в пакете (настроенное количество записей) будут обработаны, и потребитель будет остановлен

5. @GaryRussell Я открыл проблему с git hub, но она была закрыта. Должен ли я воссоздать еще один?