SeekToCurrentErrorHandler ExponentialBackOff будет регистрировать ошибки после завершения отката, это намеренно?

#spring-kafka

#spring-kafka

Вопрос:

Контекст:

Документы в «повторной попытке с отслеживанием состояния» (https://docs.spring.io/spring-kafka/reference/html/#stateful-retry ) и «стремиться к текущему» (https://docs.spring.io/spring-kafka/reference/html/#seek-to-current ) сделайте так, чтобы это звучало так, что как пользователь, я должен перейти от RetryTemplate к использованию функции BackOff в SeekToCurrentErrorHandler .

В настоящее время у меня есть сочетание RetryTemplate с бесконечным циклом для определенных исключений SeekToCurrentErrorHandler с фиксированной повторной попыткой 3 раза, которая работает для всех других исключений.

Теперь я пытаюсь заменить эту попытку на handler.setBackOffFunction((record, ex) -> { ... }); , но я столкнулся со следующей проблемой

Но я не уверен, что это было задумано, я неправильно настраиваю или это ошибка.

  • Весенняя загрузка 2.4.0
  • spring-kafka 2.6.3

Вопрос:

Когда я использую SeekToCurrentErrorHandler с большими интервалами, сообщение об ошибке для «эй, ваш слушатель выдал исключение» появляется в журнале ПОСЛЕ завершения интервала. Это намеренно? Мой код выдает исключение, и запись в журнале может появиться намного позже.

Здесь мы выполняем строку 1 22:59:14 . Вскоре после этого выдается исключение, но появляется в журналах через 10 секунд 22:59:24 . При использовании ExponentialBackOff этот таймфрейм становится все больше и больше.

 foo1 at 2020-11-20T22:59:14.850721600
2020-11-20 22:59:24.976  INFO 21456 --- [  kgh1235-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-kgh1235-1, groupId=kgh1235] Seeking to offset 21 for partition kgh1235-0
2020-11-20 22:59:24.976 ERROR 21456 --- [  kgh1235-0-C-1] essageListenerContainer$ListenerConsumer : Error handler threw an exception

org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void de.roppelt.Application.listen(java.lang.String)' threw exception; nested exception is java.lang.RuntimeException: Thrown at 2020-11-20T22:59:14.850721600; nested exception is java.lang.RuntimeException: Thrown at 2020-11-20T22:59:14.850721600
    at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:157) ~[spring-kafka-2.6.3.jar:2.6.3]
 

Воспроизводимо со следующими:

 @SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @KafkaListener(id = "so64937593", topics = "so64937593")
    public void listen(String in) {
        LocalDateTime now = LocalDateTime.now();
        System.out.println(in   " at "   now);
        throw new RuntimeException("Thrown at "   now);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
            ConsumerFactory<Object, Object> kafkaConsumerFactory) {

        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        configurer.configure(factory, kafkaConsumerFactory);
        SeekToCurrentErrorHandler eh = new SeekToCurrentErrorHandler((rec,ex) -> System.out.println("I am the dlq"), new ExponentialBackOff());
        factory.setErrorHandler(eh);
        return factory;
    }

    @Bean
    public NewTopic topic() {
        return new NewTopic("so64937593", 1, (short) 1);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            IntStream.range(0, 10).forEach(i -> template.send("so64937593", "foo"   i));
        };
    }

}
 

Я нашел следующее, чтобы дать мне запись в журнал сразу после:

 RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setListeners(new RetryListener[]{new RetryListener() {
    @Override
    public <T, E extends Throwable> boolean open(RetryContext context, RetryCallback<T, E> callback) {
        return true;
    }
    @Override
    public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {
    }
    @Override
    public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {
        System.out.println(LocalDateTime.now()   "Oops happened at "   context);
    }
}});
retryTemplate.setRetryPolicy(new NeverRetryPolicy());
factory.setRetryTemplate(retryTemplate);
 

Итак, я добавляю RetryTemplate с политикой «Neverr Try», чтобы я мог использовать регистратор.
, где теперь у меня есть немедленная обратная связь о том, что произошла ошибка.

 foo1 at 2020-11-20T23:05:57.769425100
2020-11-20T23:05:57.769425100Oops happened at [RetryContext: count=1, lastException=org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void de.roppelt.Application.listen(java.lang.String)' threw exception; nested exception is java.lang.RuntimeException: Thrown at 2020-11-20T23:05:57.769425100, exhausted=false]
2020-11-20 23:06:07.897  INFO 22608 --- [  kgh1235-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-kgh1235-1, groupId=kgh1235] Seeking to offset 21 for partition kgh1235-0
2020-11-20 23:06:07.897 ERROR 22608 --- [  kgh1235-0-C-1] essageListenerContainer$ListenerConsumer : Error handler threw an exception
 

Я думаю, что это ошибка, но я хотел отложить и опубликовать здесь, прежде чем отправлять вопрос на github.

Ответ №1:

Журнал записывается контейнером после завершения работы обработчика ошибок (у нас нет выбора по этому поводу).

Однако вы можете подавить эти журналы, изменив уровень журнала на SeekToCurrentErrorHandler . Он устанавливает уровень для исключения, и контейнер будет регистрировать его на этом уровне.

 /**
 * Set the level at which the exception thrown by this handler is logged.
 * @param logLevel the level (default ERROR).
 */
public void setLogLevel(KafkaException.Level logLevel) {
    Assert.notNull(logLevel, "'logLevel' cannot be null");
    this.logLevel = logLevel;
}
 

Это доступно с версии 2.5.

https://docs.spring.io/spring-kafka/docs/current/reference/html/#error-handlers

Как правило, обработчики ошибок, предоставляемые фреймворком, выдают исключение, когда ошибка не «обрабатывается» (например, после выполнения операции поиска). По умолчанию такие исключения регистрируются контейнером на уровне ОШИБОК. Начиная с версии 2.5, все обработчики ошибок framework расширяются KafkaExceptionLogLevelAware , что позволяет вам контролировать уровень, на котором регистрируются эти исключения.

Если вы хотите зарегистрировать первопричину (исходное исключение) до отката, вы можете подклассировать обработчик ошибок и войти в журнал перед вызовом super.handle(...) .

Если вы хотите, чтобы фреймворк регистрировал ошибку, прежде чем отступать, не стесняйтесь открывать проблему GitHub.

Комментарии:

1. Спасибо, это все объясняет. Работает так, как задумано, но у меня есть несколько вариантов.

2. Я отправлю проблему.

Ответ №2:

Основываясь на ansere Гэри, я придумал следующее.

Он сразу регистрирует исключение с некоторым контекстом (попытка, тема, смещение). Что несколько похоже на то, что предлагает LoggingListener в RetryTemplate .

 public class LoggingSeekToCurrentErrorHandler extends SeekToCurrentErrorHandler {

    public LoggingSeekToCurrentErrorHandler(BiConsumer<ConsumerRecord<?, ?>, Exception> handler, BackOff backOff) {
        super(handler, backOff);
    }

    @Override
    public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
        ConsumerRecord<?, ?> record = records.get(0);
        TopicPartitionOffset recordPosition = new TopicPartitionOffset(record.topic(), record.partition(), record.offset());
        String delivery = container.getContainerProperties().isDeliveryAttemptHeader() ? "" deliveryAttempt(recordPosition) : "NA";
        log.error("Listener threw an exception, attempt {}. On partition {} at offset {}. Record timestamp {}", delivery, recordPosition.getTopicPartition(), recordPosition.getOffset(), record.timestamp(), thrownException);

        super.handle(thrownException, records, consumer, container);
    }
}
 

Добавлено на завод:

 factory.getContainerProperties().setDeliveryAttemptHeader(true); // optional, see https://docs.spring.io/spring-kafka/reference/html/#delivery-header
errorHandler.setLogLevel(KafkaException.Level.DEBUG); // since our handler already logs. Prevent double logging of stacktrace
 

Результирующий образец:

 @Slf4j
@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @KafkaListener(id = "so64937593", topics = "so64937593")
    public void listen(String in) {
        log.info("Processing {}", in);
        throw new RuntimeException("Thrown at "   LocalDateTime.now());
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
            ConsumerFactory<Object, Object> kafkaConsumerFactory) {

        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        configurer.configure(factory, kafkaConsumerFactory);
        factory.getContainerProperties().setDeliveryAttemptHeader(true); // optional, see https://docs.spring.io/spring-kafka/reference/html/#delivery-header
        SeekToCurrentErrorHandler eh = new LoggingSeekToCurrentErrorHandler((rec,ex) -> log.error("I am the dlq"), new FixedBackOff(10_000, Long.MAX_VALUE));
        eh.setLogLevel(KafkaException.Level.DEBUG); // since our handler already logs. Prevent double logging of stacktrace
        factory.setErrorHandler(eh);
        return factory;
    }

    @Bean
    public NewTopic topic() {
        return new NewTopic("so64937593", 1, (short) 1);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            IntStream.range(0, 10).forEach(i -> template.send("so64937593", "foo"   i));
        };
    }


    public static class LoggingSeekToCurrentErrorHandler extends SeekToCurrentErrorHandler {

        public LoggingSeekToCurrentErrorHandler(BiConsumer<ConsumerRecord<?, ?>, Exception> handler, BackOff backOff) {
            super(handler, backOff);
        }

        @Override
        public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
            ConsumerRecord<?, ?> record = records.get(0);
            TopicPartitionOffset recordPosition = new TopicPartitionOffset(record.topic(), record.partition(), record.offset());
            String delivery = container.getContainerProperties().isDeliveryAttemptHeader() ? "" deliveryAttempt(recordPosition) : "NA";
            log.error("Listener threw an exception, attempt {}. On partition {} at offset {}. Record timestamp {}", delivery, recordPosition.getTopicPartition(), recordPosition.getOffset(), record.timestamp(), thrownException);

            super.handle(thrownException, records, consumer, container);
        }
    }
}
 

Комментарии:

1. К вашему сведению, вы можете получить попытку доставки без включения заголовка; просто позвоните getDeliveryAttempt(recordPosition) . Конечно, это спорно, если вы хотите предоставить приложению попытку доставки (через заголовок).

2. О, приятно знать, я просто предположил, что он не будет работать или всегда будет равен 1, если он не был включен в заводских настройках.

3. Нет; этот флаг просто сообщает контейнеру вызвать этот метод, чтобы получить попытку доставки и добавить его в качестве заголовка к записи.