#spring-kafka
Вопрос:
Я создал компонент для массива тем, и во время выполнения я добавил некоторые темы в этот массив тем, но потребитель не обновил тему и все еще использует первые темы внутри массива тем. я бы хотел, чтобы потребитель добавил эти новые темы и начал потреблять с них
@Autowired
private String[] topicArray;
@KafkaListener(topics = "#{topicArray}", groupId = "MyGroup")
public void listen(...) {
...
}
Ответ №1:
Нет; свойство оценивается один раз во время инициализации.
Вы не можете добавлять темы в существующий контейнер прослушивателя во время выполнения.
Однако вы можете сделать свой компонент прослушивателя прототипом компонента и создавать новый контейнер каждый раз, когда захотите прослушать новые темы.
Вот пример:
@SpringBootApplication
public class So68744775Application {
public static void main(String[] args) {
SpringApplication.run(So68744775Application.class, args);
}
private String[] topics;
private final AtomicInteger count = new AtomicInteger();
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
Foo foo() {
return new Foo();
}
@Bean
Supplier<String> idProvider() {
return () -> "so68744775-" count.getAndIncrement();
}
@Bean
Supplier<String[]> topicProvider() {
return () -> this.topics;
}
@Bean
ApplicationRunner runner(ApplicationContext context) {
return args -> {
this.topics = new String[] { "topic1", "topic2" };
context.getBean(Foo.class);
this.topics = new String[] { "topic3" };
context.getBean(Foo.class);
};
}
}
class Foo {
@KafkaListener(id = "#{idProvider.get()}", topics = "#{topicProvider.get()}", groupId = "grp")
public void listen(String in) {
System.out.println(in);
}
}
Однако было бы лучше опустить, groupId
чтобы каждый контейнер находился в своей собственной группе ( id
свойство). Это позволяет избежать ненужной перебалансировки при добавлении нового контейнера.
Комментарии:
1. Это решило мою проблему, спасибо за объяснение @GaryRussell .