Могу ли я добавлять темы в свой @kafkalistener во время выполнения

#spring-kafka

Вопрос:

Я создал компонент для массива тем, и во время выполнения я добавил некоторые темы в этот массив тем, но потребитель не обновил тему и все еще использует первые темы внутри массива тем. я бы хотел, чтобы потребитель добавил эти новые темы и начал потреблять с них

 @Autowired
private String[] topicArray;

@KafkaListener(topics = "#{topicArray}", groupId = "MyGroup")
    public void listen(...) {
        ...
    }
 

Ответ №1:

Нет; свойство оценивается один раз во время инициализации.

Вы не можете добавлять темы в существующий контейнер прослушивателя во время выполнения.

Однако вы можете сделать свой компонент прослушивателя прототипом компонента и создавать новый контейнер каждый раз, когда захотите прослушать новые темы.

Вот пример:

 @SpringBootApplication
public class So68744775Application {

    public static void main(String[] args) {
        SpringApplication.run(So68744775Application.class, args);
    }

    private String[] topics;

    private final AtomicInteger count = new AtomicInteger();

    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    Foo foo() {
        return new Foo();
    }

    @Bean
    Supplier<String> idProvider() {
        return () -> "so68744775-"   count.getAndIncrement();
    }

    @Bean
    Supplier<String[]> topicProvider() {
        return () -> this.topics;
    }

    @Bean
    ApplicationRunner runner(ApplicationContext context) {
        return args -> {
            this.topics = new String[] { "topic1", "topic2" };
            context.getBean(Foo.class);
            this.topics = new String[] { "topic3" };
            context.getBean(Foo.class);
        };
    }

}

class Foo {

    @KafkaListener(id = "#{idProvider.get()}", topics = "#{topicProvider.get()}", groupId = "grp")
    public void listen(String in) {
        System.out.println(in);
    }

}
 

Однако было бы лучше опустить, groupId чтобы каждый контейнер находился в своей собственной группе ( id свойство). Это позволяет избежать ненужной перебалансировки при добавлении нового контейнера.

Комментарии:

1. Это решило мою проблему, спасибо за объяснение @GaryRussell .