Является ли kafka container factory обязательным требованием в Spring Kafka?

#java #spring #apache-kafka #spring-kafka

#java #весна #apache-kafka #spring-kafka

Вопрос:

У меня есть простой потребитель в Spring working. У меня есть класс конфигурации, определенный с помощью множества фабрик и т. Д. Когда я удаляю класс config, потребитель все еще работает. Мне интересно, в чем преимущество наличия фабрики, т.Е.:

     @Bean
    public ConcurrentKafkaListenerContainerFactory<String,
            GenericRecord> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setBatchListener(true);
        return factory;
    }

    public ConsumerFactory<String, GenericRecord> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(retrieveConsumerConfigs());
    }
 

а теперь просто передаем vals через свойства приложения и называем это днем. У меня есть явный контроль над конфигурацией в подходе, основанном на классах, но я также думал, что я мог бы удалить класс и сделать val доступными через переменные spring env, такие как spring.kafka.bootstrapservers , например.

Ответ №1:

Для методов требуется завод по производству контейнеров @KafkaListener .

Spring Boot автоматически настроит один из них (из application.properties/yml), если вы не предоставите свой собственный компонент. Смотрите KafkaAutoConfiguration .

Загрузка также настроит фабрику-потребителя (если вы этого не сделаете).

Приложению, как правило, не требуется объявлять какие-либо компоненты инфраструктуры.

Редактировать

Я предпочитаю никогда не объявлять свои собственные компоненты инфраструктуры. Если мне нужна какая-то функция, которая не отображается как свойство загрузки, или когда я хочу переопределить какое-то свойство только для одного контейнера, я просто добавляю компонент customizer .

 @Component
class Customizer {

    public Customizer(ConcurrentKafkaListenerContainerFactory<?, ?> factory) {

        factory.setContainerCustomizer(container -> {
            if (container.getContainerProperties().getGroupId().equals("slowGroup")) {
                container.getContainerProperties().setIdleBetweenPolls(60_000);
            }
        });
    }

}
 

или

 @Component
class Customizer {

    Customizer(AbstractKafkaListenerContainerFactory<?, ?, ?> containerFactory,
            ThreadPoolTaskExecutor exec) {

        containerFactory.getContainerProperties().setConsumerTaskExecutor(exec);
    }

}
 

и т.д.

Комментарии:

1. Спасибо, Гэри. У меня все работает, довольно приятно удалить мой класс и делегировать через свойства. Что вы думаете по этому поводу? Может быть, если мне нужно больше контроля, класс имеет смысл, но я мало что делаю за пределами ручных подтверждений и т. Д.

2. >Thoughts on that? Maybe if I need a ton more control a class makes sense, См. Редактирование моего ответа.

Ответ №2:

простой потребитель в Spring работает, потому что автоматическая настройка spring-boot за капотами создает объект ConcurrentKafkaListenerContainerFactory и регистрирует его в контейнере spring.

Вы можете проверить это, внедрив реализацию KafkaListenerContainerFactory , как показано ниже:

 @RestController
public class EmployeeController {

    private final KafkaListenerContainerFactory kafkaListenerContainerFactory;

    @Autowired
    public EmployeeController(KafkaListenerContainerFactory kafkaListenerContainerFactory) {
        System.out.println(kafkaListenerContainerFactory instanceof ConcurrentKafkaListenerContainerFactory);
        this.kafkaListenerContainerFactory = kafkaListenerContainerFactory;
    }
}
 

Но если вас не устраивает автоматически сгенерированный компонент spring boot, вы можете создать свой собственный компонент и зарегистрировать его в контейнере spring с помощью @Bean аннотации