Принудительно отключить приложение Spring Boot, когда не удается подключиться к Kafka

#spring #spring-boot #apache-kafka #kafka-producer-api #spring-kafka

#spring #spring-boot #apache-kafka #kafka-producer-api #spring-kafka

Вопрос:

Я хочу, чтобы мое приложение spring boot завершалось сбоем при запуске, если оно не может подключиться к брокеру kafka. Мое приложение публикует сообщения только в разделах. Я добавил эту строку в свой файл свойств, но пока безуспешно spring.kafka.admin.fail-fast=true .

Ответ №1:

Дополнение о том, как ускорить сбой

TL; DR Автоматическая настройка Spring Boot 2.4.5 не позволяет ускорить отказоустойчивость с помощью параметров env. Добавьте это в свой @Configuration, чтобы получить 10-секундный тайм-аут:

 @Bean
public KafkaAdmin kafkaAdmin(@Autowired KafkaProperties properties) {
    KafkaAdmin kafkaAdmin = new KafkaAdmin(properties.buildAdminProperties());
    kafkaAdmin.setFatalIfBrokerNotAvailable(properties.getAdmin().isFailFast());

    /* speed up fail fast */
    kafkaAdmin.setOperationTimeout(5);
    kafkaAdmin.setCloseTimeout(5);

    return kafkaAdmin;
}
  

Более подробный ответ

При выполнении initialize() метода класса org.springframework.kafka.core.KafkaAdmin происходит быстрый сбой. Этот метод может блокировать:

  1. При обнаружении новых тем создание темы блокируется на срок до operationTimeout
    1. Если не удалось создать раздел, он блокирует освобождение ресурсов на срок до closeTimeout

По умолчанию эти значения равны 30 и 10 секундам соответственно (жестко заданы в классе, упомянутом выше). Вы можете переопределить их, используя установленные методы: setOperationTimeout(int sec) , setCloseTimeout(int sec) .

Как насчет Spring Boot? KafkaAdmin компонент создается в классе org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration . Как вы можете догадаться, Spring Boot просто не устанавливает тайм-ауты:

 @Bean
@ConditionalOnMissingBean
public KafkaAdmin kafkaAdmin() {
    KafkaAdmin kafkaAdmin = new KafkaAdmin(this.properties.buildAdminProperties());
    kafkaAdmin.setFatalIfBrokerNotAvailable(this.properties.getAdmin().isFailFast());
    return kafkaAdmin;
}
  

Ответ №2:

fail-fast будет работать, только если в контексте есть хотя бы один NewTopic компонент (поэтому администратор попытается проверить, существует ли тема, и создать ее, если нет).

 @SpringBootApplication
public class So55177700Application {

    public static void main(String[] args) {
        SpringApplication.run(So55177700Application.class, args);
    }

    @Bean
    public NewTopic topic() {
        return new NewTopic("so55177700", 1, (short) 1);
    }

}

Error starting ApplicationContext. To display the conditions report re-run your application with 'debug' enabled.
2019-03-15 09:42:49.555 ERROR 41793 --- [           main] o.s.boot.SpringApplication               : Application run failed

java.lang.IllegalStateException: Could not configure topics
  

Комментарии:

1. Спасибо за ваш ответ. Таким образом, это создаст тему из запущенного приложения. Что мне нужно, так это аналогичное поведение, если я использую тему. В этом случае приложение завершит работу с ошибкой, потому что метод, аннотированный с KafkaListener , попытается подключить брокер kafka и использовать сообщение.

2. Потребители запускаются в потоке, отличном от того, который запускает приложение. Добавление компонента темы — единственный готовый механизм для выполнения того, что вы хотите. Вы можете написать свой собственный Lifecycle компонент, который подключается к брокеру в start() . Но я не уверен, зачем вам это нужно, поскольку вашему потребителю в любом случае нужна тема. И, если тема уже существует, ничего не будет сделано, администратор просто проверяет ее наличие.

3. Возможно ли это сделать fail-faster ? В моем случае приложению потребовалось ~ 40 секунд, прежде чем произошел сбой (принудительное завершение работы потока жесткого ввода-вывода), хотя это намного лучше, чем ~ 2 минуты 40 секунд без fail-fast функции.

4. Лучше задать новый вопрос, а не комментировать старый ответ. Людям легче находить ответы. Обязательно ссылайтесь на ответ в новом вопросе. Вы можете попробовать уменьшить значение свойства kafka admin client по умолчанию request.timeout.ms (30 секунд). Используя Boot, это spring.kafka.admin.properties.request.timeout.ms=5000