Перенести опрос apache kafka в spring kafka

#spring-kafka

#spring-kafka

Вопрос:

В приложении с загрузкой с помощью spring (версия 2.1.4) требуется перенести apache-kafka на spring-kafka. Текущий потребитель kafka выполняет: 1) Компонент KafkaConsumer инициализируется во время приложения 2) В нем установлен раздел темы «0» 3) опрос данных с помощью apache KafkaConsumer в ConsumerRecord 4) Собственное приложение имеет механизм повторных попыток для ожидания и повторного опроса до max_retry

Устаревший код выглядит ниже:

  while (!done.get()) {
    ConsumerRecords<byte[], <byte[]> records = kafkaConsumer.poll(<MAX_VALUE>);
    if (records.isEmpty()) {
        retryCount  ;
        Thread.sleep(<some_time>);
    } else {
    // Process records;
    }
    if (retryCount > <max_retry_count>) {
    done.set(true); 
    }    
  }
  

Попробовал следующие подходы:
1) Используя аннотацию spring kafka (@KafkaListener), но это не позволяет нам контролировать опрос.
2) Создан «ConcurrentMessageListenerContainer», и setupMessageListener добавляет записи в очередь для опроса. Это дает нам контроль над потребителем.

Я хотел знать, двигаюсь ли я в правильном направлении? Что было бы лучшим решением для достижения вышеуказанных требований с использованием spring-kafka?

Ответ №1:

Неясно, что вы подразумеваете под «контролем над потребителем». Создание контейнера — это то же самое, что и использование a @KafkaListener (контейнер создается под обложками).

Spring использует подход, основанный на «управляемых сообщениях».

Вы можете установить idleEventInterval , и контейнер опубликует a ListenerContainerIdleEvent , если за это время не будет получено никаких записей. Вы можете прослушивать эти события с ApplicationListener помощью компонента или @EventListener метода.

Комментарии:

1. Спасибо, Гэри. «Контроль над потребителем» означает, что нам нужно будет запускать kafka consumer только для определенного события. Использование @KafkaListener не имеет такого преимущества, следовательно, с помощью метода start() KafkaMessageListenerContainer мы можем запустить контейнер-потребитель kafka только для этого события.

2. У вас есть тот же «контроль» над @KafkaListener s — см. @KafkaListener Управление жизненным циклом в документации.