Приостановка / запуск потоковых процессоров Kafka при загрузке Spring

#java #spring-boot #apache-kafka #spring-kafka #circuit-breaker

#java #весенняя загрузка #apache-kafka #spring-kafka #автоматический выключатель

Вопрос:

Я собираюсь реализовать шаблон автоматического отключения для сообщений. Основное требование заключается в том, что если микросервис не может публиковать сообщения в теме публикации, он должен прекратить принимать сообщения из других тем Kafka. Когда темы публикации становятся доступными, микросервис должен начать принимать сообщения из других тем Kafka.

Есть ли способ добиться этого в потоках Spring boot Kafka?

Ответ №1:

Я смог добиться этого с помощью BindingsEndpoint .

 private final BindingsEndpoint binding;

@Override
public void stop() {
    List<?> objects = binding.queryStates();
    if (!objects.isEmpty()) {
        log.info("Stopping Kafka topics ");
        List<Binding> bindings = getBindings(objects);
        bindings.forEach(Binding::stop);
        log.info("Stopped Kafka topics ");
    }
}

@Override
public void start() {
    List<?> objects = binding.queryStates();
    if (!objects.isEmpty()) {
        log.info("Starting Kafka topics ");
        List<Binding> bindings = getBindings(objects);
        bindings.forEach(Binding::start);
        log.info("Started Kafka topics ");
    }
}

protected List<Binding> getBindings(List<?> objects) {
    return objects.stream().filter(object -> object instanceof Binding)
            .map(object -> (Binding) object).collect(Collectors.toList());
}
 

Комментарии:

1. В чем разница между stop и pause? Почему и когда я должен использовать тот или иной?