#apache-kafka #spring-kafka
Вопрос:
У меня есть две разные темы для Кафки. Тип данных и структура данных (Json) одинаковы в обеих темах. Могу ли я использовать один и тот же контейнер прослушивателя для обеих тем? Я попробовал использовать, и мне удалось получить результаты, полученные из обеих тем.
Ниже приведены мои конфигурации.
@Bean
public ConcurrentKafkaListenerContainerFactory<Object, Object> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer factoryConfigure,
ConsumerFactory<Object, Object> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factoryConfigure.configure(factory, kafkaConsumerFactory);
factory.setBatchListener(true);
return factory;
}
Это мой слушатель.
@KafkaListener(id = "#{'${spring.kafka.listener.id}'}", topics = "#{'${spring.kafka.consumer.topic}'}", groupId = "#{'${spring.kafka.consumer.group-id}'}")
public void getTopics( List<Request> model) {
System.out.println("I am from topic1");
System.out.println(model);
}
@KafkaListener(id = "#{'${spring.kafka.listener.id2}'}", topics = "#{'${spring.kafka.consumer.topic2}'}", groupId = "#{'${spring.kafka.consumer.group-id2}'}")
public void getTopics( List<Request> model) {
System.out.println("I am from topic2");
System.out.println(model);
}
Я явно не использую никаких аннотаций для указания имени контейнера.
Может ли это привести меня к какому-либо вопросу?
Ответ №1:
Неясно, о чем вы спрашиваете; фабрика контейнеров создаст отдельный контейнер для каждого слушателя.
topics
Свойство представляет собой массив, поэтому вы можете использовать
@KafkaListener(id = "#{'${spring.kafka.listener.id}'}",
topics = { "${spring.kafka.consumer.topic}",
"${spring.kafka.consumer.topic2}" },
groupId = "#{'${spring.kafka.consumer.group-id}'}")
public void getTopics( List<Request> model) {
System.out.println("I am from both topics");
System.out.println(model);
}
Если вам нужно знать, из какой темы взята каждая запись, вы можете использовать
public void getTopics( List<Request> model,
@Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics) {
Тема в индексе n предназначена для записи в индексе модели n.
Комментарии:
1. Спасибо Гэри за ответ. У меня есть отдельные слушатели, потому что я хочу, чтобы у них была другая логика обработки. Будет ли аннотация на containerFactory прослушивателя = «kafkaListenerContainerFactory» будет автоматически помещена здесь? Также можно ли расставить приоритеты по какой-либо теме ?
2. Да; смотрите javadoc для аннотации — это имя заводского компонента по умолчанию. Я не уверен, что вы подразумеваете под «расставлением приоритетов».
3. Я хотел приостановить потребление из одной темы(низкий приоритет) на некоторое время, если данные находятся в другой теме(высокий приоритет). Я хотел, чтобы записи по темам с более высоким приоритетом обрабатывались в первую очередь.
4. Вы можете использовать
KafkaListenerEndpointRegistry
компонент, чтобы получить ссылку на контейнеры (используя ихid
). Затем вы можете приостановить/возобновить (или остановить/запустить) контейнер(ы) таким образом. Вы также можете установить для этогоautoStartup
свойстваfalse
значение, чтобы они не запускались автоматически. Также смотрите docs.spring.io/spring-kafka/docs/current/reference/html/…5. Спасибо, это очень полезно