Как отключить автозапуск потребителя KAFKA без необходимости вносить какие-либо изменения в код, включая установку автозапуска = «{xyz}» в Spring Boot?

#java #apache-kafka #spring-kafka

#java #apache-kafka #spring-kafka

Вопрос:

Ниже приведен мой потребитель KAFKA

 @Service
public class Consumer {

    private static final Logger LOGGER = Logger.getLogger(Consumer.class.getName());
    public static Queue<ProductKafka> consumeQueue = new LinkedList<>();

    @KafkaListener(topics = "#{'${spring.kafka.topics}'.split('\\ ')}", groupId = "#{'${spring.kafka.groupId}'}")
    public void consume(ProductKafka productKafka) throws IOException {
        consumeQueue.add(productKafka);
        LOGGER.info(String.format("#### -> Logger Consumed message -> %s", productKafka.toString()));
        System.out.printf("#### -> Consumed message -> %s", productKafka.toString());
    }
}
 

и ниже приведен мой файл «application.properties»

 spring.kafka.topics=Product
spring.kafka.groupId=Product-Group
 

Мой потребитель KAFKA запускается автоматически.

Однако я хочу отключить автозапуск потребителя KAFKA без необходимости вносить какие-либо изменения в существующий код, включая установку автозапуска = «{xyz}» в классе consumer из-за требования.

Я ищу существующие свойства, которые отключили бы автозапуск потребителя KAFKA, что-то вроде этого

 spring.kafka.consumer.enable=false
 

Примечание: у меня есть несколько потребителей KAFKA, и указанное выше свойство должно отключить всех потребителей в проекте.

есть ли у нас какие-либо существующие свойства, которые бы отключали автозапуск KAFKA consumer без необходимости внесения каких-либо изменений в существующий код?

Комментарии:

1. Вы пытаетесь программно контролировать, когда потребители запускаются / останавливаются? Если это так, я не уверен, что Spring будет лучшим вариантом для этого уровня контроля

2. Нет. Все, что я хочу сделать, это отключить поведение автозапуска потребителя KAKFA, используя свойство spring profile (например, spring.kafka.consumer.enable=false), без необходимости вносить какие-либо изменения в код, включая настройку свойства autoStartup (например, — @KafkaListener(autoStartup = «{xyz}»)

3. Я не хочу вводить какие-либо пользовательские свойства — например: kafka.consumer.autostart=true и сопоставлять его с @KafkaListener(автозапуск = «#{‘${kafka.consumer.autostart}’}»)

4. > If so, I'm not sure Spring would be the best option for that level of control Spring абсолютно обеспечивает этот уровень контроля. docs.spring.io/spring-kafka/docs/current/reference/html /… Но нет стандартного свойства для управления им.

Ответ №1:

Стандартного готового свойства не существует; вы должны предоставить свое собственное.

autoStartup="${should.start:true}"

запустит контейнер, если свойство should.start отсутствует.

Редактировать

Просто добавьте что-то подобное в свое приложение.

 @Component
class Customizer {

    Customizer(AbstractKafkaListenerContainerFactory<?, ?, ?> factory,
            @Value("${start.containers:true}") boolean start) {

        factory.setAutoStartup(start);
    }

}
 
 start:
  containers: false
 

Комментарии:

1. Есть ли способ создать своего рода класс запуска, который считывал бы пользовательское свойство и управлял автозапуском Kafka? Причина в том, что в проекте есть несколько потребителей, и необходимо обновить несколько проектов. Я не хочу обновлять отдельных потребителей.

2. Вы можете установить autoStartup свойство на фабрике контейнеров, и оно будет применяться ко всем потребителям, созданным им. Свойство аннотации позволяет переопределять его для отдельных слушателей.

3. не могли бы вы помочь мне с примером?

4. @GaryRussell — Ошибка — java.lang. Исключение IllegalStateException: не удалось загрузить ApplicationContext, вызванный: org.springframework.beans.factory . Исключение UnsatisfiedDependencyException: ошибка при создании компонента с именем «kafkaConsumerCustomizer», определенным в файле [kafkaConsumerCustomizer.class ]: Неудовлетворенная зависимость, выраженная через параметр конструктора 0; вложенным исключением является org.springframework.beans.factory. Исключение NoSuchBeanDefinitionException: отсутствует соответствующий компонент типа ‘org.springframework.kafka.config. Доступно AbstractKafkaListenerContainerFactory’: ожидается наличие как минимум 1 компонента, который квалифицируется как кандидат на автоматическое подключение.

5. @GaryRussell — Не могли бы вы предложить?