#spring-kafka
#spring-kafka
Вопрос:
В приложении с загрузкой с помощью spring (версия 2.1.4) требуется перенести apache-kafka на spring-kafka. Текущий потребитель kafka выполняет: 1) Компонент KafkaConsumer инициализируется во время приложения 2) В нем установлен раздел темы «0» 3) опрос данных с помощью apache KafkaConsumer в ConsumerRecord 4) Собственное приложение имеет механизм повторных попыток для ожидания и повторного опроса до max_retry
Устаревший код выглядит ниже:
while (!done.get()) {
ConsumerRecords<byte[], <byte[]> records = kafkaConsumer.poll(<MAX_VALUE>);
if (records.isEmpty()) {
retryCount ;
Thread.sleep(<some_time>);
} else {
// Process records;
}
if (retryCount > <max_retry_count>) {
done.set(true);
}
}
Попробовал следующие подходы:
1) Используя аннотацию spring kafka (@KafkaListener), но это не позволяет нам контролировать опрос.
2) Создан «ConcurrentMessageListenerContainer», и setupMessageListener добавляет записи в очередь для опроса. Это дает нам контроль над потребителем.
Я хотел знать, двигаюсь ли я в правильном направлении? Что было бы лучшим решением для достижения вышеуказанных требований с использованием spring-kafka?
Ответ №1:
Неясно, что вы подразумеваете под «контролем над потребителем». Создание контейнера — это то же самое, что и использование a @KafkaListener
(контейнер создается под обложками).
Spring использует подход, основанный на «управляемых сообщениях».
Вы можете установить idleEventInterval
, и контейнер опубликует a ListenerContainerIdleEvent
, если за это время не будет получено никаких записей. Вы можете прослушивать эти события с ApplicationListener
помощью компонента или @EventListener
метода.
Комментарии:
1. Спасибо, Гэри. «Контроль над потребителем» означает, что нам нужно будет запускать kafka consumer только для определенного события. Использование @KafkaListener не имеет такого преимущества, следовательно, с помощью метода start() KafkaMessageListenerContainer мы можем запустить контейнер-потребитель kafka только для этого события.
2. У вас есть тот же «контроль» над
@KafkaListener
s — см.@KafkaListener
Управление жизненным циклом в документации.