#spring-boot #apache-kafka #spring-cloud-stream #resilience4j
#пружинный загрузчик #apache-kafka #spring-cloud-stream #resilience4j
Вопрос:
Мне нужна помощь в понимании того, как я могу найти решение с использованием Spring boot, Kafka, Resilence4J для выполнения вызова микросервиса от моего потребителя Kafka. Допустим, если микросервис не работает, мне нужно уведомить моего потребителя Kafka, используя шаблон автоматического выключателя, чтобы прекратить получение сообщений / событий до тех пор, пока микросервис не будет запущен и запущен.
Комментарии:
1. Привет, просто для уточнения. У вас есть служба A, которая использует сообщения из темы и вызывает другую службу B для каждого нового сообщения? Вы хотите прекратить использование сообщений из темы, если служба B не работает?
2. @RobertWinkler абсолютно прав. А также попробуйте повторить попытку перед размыканием цепи
Ответ №1:
С Spring Kafka вы можете использовать методы pause
и resume
в зависимости от переходов состояния выключателя. Лучший способ, который я нашел для этого, — определить его как «супервизор» с аннотацией @Configuration. Также используется Resilience4j.
@Configuration
public class CircuitBreakerConsumerConfiguration {
public CircuitBreakerConsumerConfiguration(CircuitBreakerRegistry circuitBreakerRegistry, KafkaManager kafkaManager) {
circuitBreakerRegistry.circuitBreaker("yourCBName").getEventPublisher().onStateTransition(event -> {
switch (event.getStateTransition()) {
case CLOSED_TO_OPEN:
case CLOSED_TO_FORCED_OPEN:
case HALF_OPEN_TO_OPEN:
kafkaManager.pause();
break;
case OPEN_TO_HALF_OPEN:
case HALF_OPEN_TO_CLOSED:
case FORCED_OPEN_TO_CLOSED:
case FORCED_OPEN_TO_HALF_OPEN:
kafkaManager.resume();
break;
default:
throw new IllegalStateException("Unknown transition state: " event.getStateTransition());
}
});
}
}
Это то , что я использовал в сочетании с аннотированным @Component
KafkaManager .
@Component
public class KafkaManager {
private final KafkaListenerEndpointRegistry registry;
public KafkaManager(KafkaListenerEndpointRegistry registry) {
this.registry = registry;
}
public void pause() {
registry.getListenerContainers().forEach(MessageListenerContainer::pause);
}
public void resume() {
registry.getListenerContainers().forEach(MessageListenerContainer::resume);
}
}
Кроме того, мой потребительский сервис выглядит так:
@KafkaListener(topics = "#{'${topic.name}'}", concurrency = "1", id = "CBListener")
public void receive(final ConsumerRecord<String, ReplayData> replayData, Acknowledgment acknowledgment) throws
Exception {
try {
httpClientServiceCB.receiveHandleCircuitBreaker(replayData);
acknowledgement.acknowledge();
} catch (Exception e) {
acknowledgment.nack(1000);
}
}
И @CircuitBreaker
аннотацию:
@CircuitBreaker(name = "yourCBName")
public void receiveHandleCircuitBreaker(ConsumerRecord<String, ReplayData> replayData) throws
Exception {
try {
String response = restTemplate.getForObject("http://localhost:8081/item", String.class);
} catch (Exception e ) {
// throwing the exception is needed to trigger the Circuit Breaker state change
throw new Exception();
}
}
И это дополнительно дополняется следующим application.properties
resilience4j.circuitbreaker.instances.yourCBName.failure-rate-threshold=80
resilience4j.circuitbreaker.instances.yourCBName.sliding-window-type=COUNT_BASED
resilience4j.circuitbreaker.instances.yourCBName.sliding-window-size=5
resilience4j.circuitbreaker.instances.yourCBName.wait-duration-in-open-state=10000
resilience4j.circuitbreaker.instances.yourCBName.automatic-transition-from-open-to-half-open-enabled=true
spring.kafka.consumer.enable.auto.commit = false
spring.kafka.listener.ack-mode = MANUAL_IMMEDIATE
Также взгляните на https://resilience4j.readme.io/docs/circuitbreaker
Комментарии:
1. Совместимо ли это с реализацией SeekToCurrentErrorHandler / DefaultErrorHandler, используемой для повторной попытки?
Ответ №2:
Если вы используете Spring Kafka, вы могли бы использовать методы pause
и resume
ConcurrentMessageListenerContainer
класса. Вы можете прикрепить EventListener к прерывателю цепи, который прослушивает переходы состояний и приостанавливает или возобновляет обработку событий. Введите CircuitBreakerRegistry в ваш компонент:
circuitBreakerRegistry.circuitBreaker("yourCBName").getEventPublisher().onStateTransition(
event -> {
switch (event.getStateTransition()) {
case CLOSED_TO_OPEN:
container.pause();
case OPEN_TO_HALF_OPEN:
container.resume();
case HALF_OPEN_TO_CLOSED:
container.resume();
case HALF_OPEN_TO_OPEN:
container.pause();
case CLOSED_TO_FORCED_OPEN:
container.pause();
case FORCED_OPEN_TO_CLOSED:
container.resume();
case FORCED_OPEN_TO_HALF_OPEN:
container.resume();
default:
}
}
);