можем ли мы использовать один и тот же контейнер для разных тем?

#apache-kafka #spring-kafka

Вопрос:

У меня есть две разные темы для Кафки. Тип данных и структура данных (Json) одинаковы в обеих темах. Могу ли я использовать один и тот же контейнер прослушивателя для обеих тем? Я попробовал использовать, и мне удалось получить результаты, полученные из обеих тем.

Ниже приведены мои конфигурации.

 @Bean
public ConcurrentKafkaListenerContainerFactory<Object, Object> kafkaListenerContainerFactory(
    ConcurrentKafkaListenerContainerFactoryConfigurer factoryConfigure,
    ConsumerFactory<Object, Object> kafkaConsumerFactory) {
    ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factoryConfigure.configure(factory, kafkaConsumerFactory);
    factory.setBatchListener(true);
    return factory;
}
 

Это мой слушатель.

 
@KafkaListener(id = "#{'${spring.kafka.listener.id}'}", topics = "#{'${spring.kafka.consumer.topic}'}", groupId = "#{'${spring.kafka.consumer.group-id}'}")
public void getTopics( List<Request> model) {

    System.out.println("I am from topic1");
    System.out.println(model);

}

@KafkaListener(id = "#{'${spring.kafka.listener.id2}'}", topics = "#{'${spring.kafka.consumer.topic2}'}", groupId = "#{'${spring.kafka.consumer.group-id2}'}")
public void getTopics( List<Request> model) {

System.out.println("I am from topic2");
System.out.println(model);

}
 

Я явно не использую никаких аннотаций для указания имени контейнера.
Может ли это привести меня к какому-либо вопросу?

Ответ №1:

Неясно, о чем вы спрашиваете; фабрика контейнеров создаст отдельный контейнер для каждого слушателя.

topics Свойство представляет собой массив, поэтому вы можете использовать

 @KafkaListener(id = "#{'${spring.kafka.listener.id}'}", 
    topics = { "${spring.kafka.consumer.topic}",
               "${spring.kafka.consumer.topic2}" },
    groupId = "#{'${spring.kafka.consumer.group-id}'}")
public void getTopics( List<Request> model) {

    System.out.println("I am from both topics");
    System.out.println(model);

}
 

Если вам нужно знать, из какой темы взята каждая запись, вы можете использовать

 public void getTopics( List<Request> model,
     @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics) {
 

Тема в индексе n предназначена для записи в индексе модели n.

Комментарии:

1. Спасибо Гэри за ответ. У меня есть отдельные слушатели, потому что я хочу, чтобы у них была другая логика обработки. Будет ли аннотация на containerFactory прослушивателя = «kafkaListenerContainerFactory» будет автоматически помещена здесь? Также можно ли расставить приоритеты по какой-либо теме ?

2. Да; смотрите javadoc для аннотации — это имя заводского компонента по умолчанию. Я не уверен, что вы подразумеваете под «расставлением приоритетов».

3. Я хотел приостановить потребление из одной темы(низкий приоритет) на некоторое время, если данные находятся в другой теме(высокий приоритет). Я хотел, чтобы записи по темам с более высоким приоритетом обрабатывались в первую очередь.

4. Вы можете использовать KafkaListenerEndpointRegistry компонент, чтобы получить ссылку на контейнеры (используя их id ). Затем вы можете приостановить/возобновить (или остановить/запустить) контейнер(ы) таким образом. Вы также можете установить для этого autoStartup свойства false значение, чтобы они не запускались автоматически. Также смотрите docs.spring.io/spring-kafka/docs/current/reference/html/…

5. Спасибо, это очень полезно