Как сбросить счетчик повторных попыток в Spring Kafka consumer, когда исключение, сгенерированное при первой попытке, отличается от второй попытки?

#spring-boot #apache-kafka #kafka-consumer-api #spring-kafka

#spring-boot #apache-kafka #kafka-consumer-api #spring-kafka

Вопрос:

Я пытаюсь реализовать Kafka retry consumer в spring-boot и использую SeekToCurrentErrorHandler для повторных попыток.

Я установил политику отсрочки на 5 попыток повторения.

Мой вопрос в том, допустим, при первой попытке повторной попытки исключение было «база данных недоступна», а вторая попытка db была доступна, но на другом шаге произошел еще один сбой, например, тайм-аут, в этом случае счетчик повторных попыток вернется к нулю и начнет новую попытку или продолжитпопробуйте только из оставшихся действий с 1-й попытки.

Мое требование — сбрасывать счетчик повторных попыток обратно до нуля каждый раз, когда исключение отличается от того, что было сгенерировано ранее.

Как добиться этого в Kafka?

Вот моя конфигурация потребителя.

 @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Anky> kafkaRetryListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, Anky> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);

        SeekToCurrentErrorHandler errorHandler = new SeekToCurrentErrorHandler((record, exception) -> {
            System.out.println(
                    "RetryPolicy** limit has been exceeded! You should really handle this better."   record.key());
        }, new FixedBackOff(0L, 5L));
        errorHandler.addNotRetryableException(IllegalArgumentException.class);
        errorHandler.setCommitRecovered(true);

        factory.setErrorHandler(errorHandler);
        // to keep the consumers alive the failure gets reported to broker so that
        // consumers remain alive
        factory.setStatefulRetry(true);
        factory.setConcurrency(2);
        return factory;
    }

@KafkaListener(topics = "${retry.topic}", groupId = "${consumer.groupId}", containerFactory = "kafkaRetryListenerContainerFactory")
    public void onRetryMessage(ConsumerRecord<String, Anky> record, Acknowledgment acknowledgment)
            throws SystemException {
        LOGGER.debug("are you here in retry?***");
//process message has both DB calls as well as rest calls so it can fail due to a database error and can also fail due to HTTP failure
            processMessage(record, acknowledgment, false, offerIntakeContext);
            }
            
    }
  

Ответ №1:

Это невозможно со стандартным обработчиком ошибок; он не отслеживает предыдущий тип исключения.

Вам понадобится пользовательский обработчик ошибок; единственный другой вариант — отслеживать предыдущее исключение в вашем прослушивателе и вызывать clearThreadState() обработчик ошибок (из метода прослушивателя в потоке прослушивателя), который очистит все состояние повтора для этого потока прослушивателя.

Комментарии:

1. Спасибо за ответ. Есть ли способ получить счетчик повторных попыток, скажем, если для попыток повторения установлено значение 5, и выполняется попытка 2, могу ли я где-нибудь получить эту информацию?

2. Если вы используете версию 2.5 или более позднюю, вы можете установить свойство deliveryAttemptHeader контейнера, и контейнер добавит заголовок KafkaHeaders.DELIVERY_ATTEMPT к записи headers() . docs.spring.io/spring-kafka/docs/2.6.2/reference/html /…

3. Я открыл проблему, чтобы поддержать ваш вариант использования: github.com/spring-projects/spring-kafka/issues/1610