#apache-kafka #kafka-consumer-api #spring-kafka
#apache-kafka #kafka-consumer-api #spring-kafka
Вопрос:
Пример использования: учитывая тему со 100 сообщениями в теме kafka, я хочу прочитать сообщение со смещением 10 до смещения 20. Я мог бы извлечь данные из начального смещения. когда я достигаю конечного смещения, я написал код для остановки контейнера.Даже после выполнения кода потребитель может использовать дополнительные сообщения (со смещения 21). Он останавливается только после чтения всех сообщений в теме
@Service
public class Consumer1 implements MessageListener<String, GenericRecord> {
@Override
public void onMessage(ConsumerRecord<String, GenericRecord> data) {
log.info("feed record {}", data);
if (data.offset() == 20) {
feedService.stopConsumer();
}
}
}
@Service
public class FeedService{
// start logic here
public void stopConsumer() {
kafkaMessageListenerContainer.stop();
}
}
Примечание: я использую последнюю версию spring-kafka (2.6.4). Одно из наблюдений заключается в том, что выполняется метод остановки контейнера, но потребитель не закрывается.И никаких ошибок на выходе
Ответ №1:
stop()
Не завершает пакетный цикл текущих записей:
while (isRunning()) {
try {
pollAndInvoke();
}
catch (@SuppressWarnings(UNUSED) WakeupException e) {
// Ignore, we're stopping or applying immediate foreign acks
}
Это pollAndInvoke()
вызывает a KafkaConsumer.poll()
, получает некоторую коллекцию записей и вызывает вашу onMessage()
для каждой записи. В какой-то момент вы решаете вызвать остановку, но это не значит, что мы действительно находимся в конце этого списка записей, чтобы немедленно выйти.
Мы действительно останавливаемся на следующем цикле, когда это isRunning()
уже возвращается false
для нас.
Комментарии:
1. Будет ли пользователь удален при последнем сообщении в теме ? или он будет завершен после выборки нескольких записей (в пакете) после вызова stop() с определенным смещением?
2. Прямо сейчас он завершается, когда обрабатывается весь итератор записей. Как я уже сказал: вы останавливаетесь в середине пакета, но мы не проверяем это состояние до следующего цикла опроса. Это не означает, что последнее обработанное сообщение действительно является последним в теме. Вероятно, вы можете подумать о том, чтобы контролировать количество записей, которые вы извлекаете, в зависимости от конкретных потребительских параметров Kafka.
3. Вероятно, нам следует добавить параметр конфигурации для немедленной остановки (после обработки текущей записи). Пожалуйста, откройте проблему с GitHub.
4. @ArtemBilan можем ли мы настроить потребительское свойство max.poll.records. так что нам не нужно обрабатывать все записи в теме, но оставшиеся записи в пакете (настроенное количество записей) будут обработаны, и потребитель будет остановлен
5. @GaryRussell Я открыл проблему с git hub, но она была закрыта. Должен ли я воссоздать еще один?