#spring-boot #apache-kafka #kafka-consumer-api #spring-kafka
#spring-boot #apache-kafka #kafka-consumer-api #spring-kafka
Вопрос:
Я пытаюсь реализовать Kafka retry consumer в spring-boot и использую SeekToCurrentErrorHandler для повторных попыток.
Я установил политику отсрочки на 5 попыток повторения.
Мой вопрос в том, допустим, при первой попытке повторной попытки исключение было «база данных недоступна», а вторая попытка db была доступна, но на другом шаге произошел еще один сбой, например, тайм-аут, в этом случае счетчик повторных попыток вернется к нулю и начнет новую попытку или продолжитпопробуйте только из оставшихся действий с 1-й попытки.
Мое требование — сбрасывать счетчик повторных попыток обратно до нуля каждый раз, когда исключение отличается от того, что было сгенерировано ранее.
Как добиться этого в Kafka?
Вот моя конфигурация потребителя.
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Anky> kafkaRetryListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Anky> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
SeekToCurrentErrorHandler errorHandler = new SeekToCurrentErrorHandler((record, exception) -> {
System.out.println(
"RetryPolicy** limit has been exceeded! You should really handle this better." record.key());
}, new FixedBackOff(0L, 5L));
errorHandler.addNotRetryableException(IllegalArgumentException.class);
errorHandler.setCommitRecovered(true);
factory.setErrorHandler(errorHandler);
// to keep the consumers alive the failure gets reported to broker so that
// consumers remain alive
factory.setStatefulRetry(true);
factory.setConcurrency(2);
return factory;
}
@KafkaListener(topics = "${retry.topic}", groupId = "${consumer.groupId}", containerFactory = "kafkaRetryListenerContainerFactory")
public void onRetryMessage(ConsumerRecord<String, Anky> record, Acknowledgment acknowledgment)
throws SystemException {
LOGGER.debug("are you here in retry?***");
//process message has both DB calls as well as rest calls so it can fail due to a database error and can also fail due to HTTP failure
processMessage(record, acknowledgment, false, offerIntakeContext);
}
}
Ответ №1:
Это невозможно со стандартным обработчиком ошибок; он не отслеживает предыдущий тип исключения.
Вам понадобится пользовательский обработчик ошибок; единственный другой вариант — отслеживать предыдущее исключение в вашем прослушивателе и вызывать clearThreadState()
обработчик ошибок (из метода прослушивателя в потоке прослушивателя), который очистит все состояние повтора для этого потока прослушивателя.
Комментарии:
1. Спасибо за ответ. Есть ли способ получить счетчик повторных попыток, скажем, если для попыток повторения установлено значение 5, и выполняется попытка 2, могу ли я где-нибудь получить эту информацию?
2. Если вы используете версию 2.5 или более позднюю, вы можете установить свойство
deliveryAttemptHeader
контейнера, и контейнер добавит заголовокKafkaHeaders.DELIVERY_ATTEMPT
к записиheaders()
. docs.spring.io/spring-kafka/docs/2.6.2/reference/html /…3. Я открыл проблему, чтобы поддержать ваш вариант использования: github.com/spring-projects/spring-kafka/issues/1610