#java #spring #apache-kafka #spring-kafka
#java #весна #apache-kafka #spring-kafka
Вопрос:
У меня есть простой потребитель в Spring working. У меня есть класс конфигурации, определенный с помощью множества фабрик и т. Д. Когда я удаляю класс config, потребитель все еще работает. Мне интересно, в чем преимущество наличия фабрики, т.Е.:
@Bean
public ConcurrentKafkaListenerContainerFactory<String,
GenericRecord> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
return factory;
}
public ConsumerFactory<String, GenericRecord> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(retrieveConsumerConfigs());
}
а теперь просто передаем vals через свойства приложения и называем это днем. У меня есть явный контроль над конфигурацией в подходе, основанном на классах, но я также думал, что я мог бы удалить класс и сделать val доступными через переменные spring env, такие как spring.kafka.bootstrapservers , например.
Ответ №1:
Для методов требуется завод по производству контейнеров @KafkaListener
.
Spring Boot автоматически настроит один из них (из application.properties/yml), если вы не предоставите свой собственный компонент. Смотрите KafkaAutoConfiguration
.
Загрузка также настроит фабрику-потребителя (если вы этого не сделаете).
Приложению, как правило, не требуется объявлять какие-либо компоненты инфраструктуры.
Редактировать
Я предпочитаю никогда не объявлять свои собственные компоненты инфраструктуры. Если мне нужна какая-то функция, которая не отображается как свойство загрузки, или когда я хочу переопределить какое-то свойство только для одного контейнера, я просто добавляю компонент customizer .
@Component
class Customizer {
public Customizer(ConcurrentKafkaListenerContainerFactory<?, ?> factory) {
factory.setContainerCustomizer(container -> {
if (container.getContainerProperties().getGroupId().equals("slowGroup")) {
container.getContainerProperties().setIdleBetweenPolls(60_000);
}
});
}
}
или
@Component
class Customizer {
Customizer(AbstractKafkaListenerContainerFactory<?, ?, ?> containerFactory,
ThreadPoolTaskExecutor exec) {
containerFactory.getContainerProperties().setConsumerTaskExecutor(exec);
}
}
и т.д.
Комментарии:
1. Спасибо, Гэри. У меня все работает, довольно приятно удалить мой класс и делегировать через свойства. Что вы думаете по этому поводу? Может быть, если мне нужно больше контроля, класс имеет смысл, но я мало что делаю за пределами ручных подтверждений и т. Д.
2.
>Thoughts on that? Maybe if I need a ton more control a class makes sense,
См. Редактирование моего ответа.
Ответ №2:
простой потребитель в Spring работает, потому что автоматическая настройка spring-boot за капотами создает объект ConcurrentKafkaListenerContainerFactory
и регистрирует его в контейнере spring.
Вы можете проверить это, внедрив реализацию KafkaListenerContainerFactory
, как показано ниже:
@RestController
public class EmployeeController {
private final KafkaListenerContainerFactory kafkaListenerContainerFactory;
@Autowired
public EmployeeController(KafkaListenerContainerFactory kafkaListenerContainerFactory) {
System.out.println(kafkaListenerContainerFactory instanceof ConcurrentKafkaListenerContainerFactory);
this.kafkaListenerContainerFactory = kafkaListenerContainerFactory;
}
}
Но если вас не устраивает автоматически сгенерированный компонент spring boot, вы можете создать свой собственный компонент и зарегистрировать его в контейнере spring с помощью @Bean
аннотации