#spring-kafka
#spring-kafka
Вопрос:
Контекст:
Документы в «повторной попытке с отслеживанием состояния» (https://docs.spring.io/spring-kafka/reference/html/#stateful-retry ) и «стремиться к текущему» (https://docs.spring.io/spring-kafka/reference/html/#seek-to-current ) сделайте так, чтобы это звучало так, что как пользователь, я должен перейти от RetryTemplate к использованию функции BackOff в SeekToCurrentErrorHandler
.
В настоящее время у меня есть сочетание RetryTemplate
с бесконечным циклом для определенных исключений SeekToCurrentErrorHandler
с фиксированной повторной попыткой 3 раза, которая работает для всех других исключений.
Теперь я пытаюсь заменить эту попытку на handler.setBackOffFunction((record, ex) -> { ... });
, но я столкнулся со следующей проблемой
Но я не уверен, что это было задумано, я неправильно настраиваю или это ошибка.
- Весенняя загрузка 2.4.0
- spring-kafka 2.6.3
Вопрос:
Когда я использую SeekToCurrentErrorHandler
с большими интервалами, сообщение об ошибке для «эй, ваш слушатель выдал исключение» появляется в журнале ПОСЛЕ завершения интервала. Это намеренно? Мой код выдает исключение, и запись в журнале может появиться намного позже.
Здесь мы выполняем строку 1 22:59:14
. Вскоре после этого выдается исключение, но появляется в журналах через 10 секунд 22:59:24
. При использовании ExponentialBackOff
этот таймфрейм становится все больше и больше.
foo1 at 2020-11-20T22:59:14.850721600
2020-11-20 22:59:24.976 INFO 21456 --- [ kgh1235-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-kgh1235-1, groupId=kgh1235] Seeking to offset 21 for partition kgh1235-0
2020-11-20 22:59:24.976 ERROR 21456 --- [ kgh1235-0-C-1] essageListenerContainer$ListenerConsumer : Error handler threw an exception
org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void de.roppelt.Application.listen(java.lang.String)' threw exception; nested exception is java.lang.RuntimeException: Thrown at 2020-11-20T22:59:14.850721600; nested exception is java.lang.RuntimeException: Thrown at 2020-11-20T22:59:14.850721600
at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:157) ~[spring-kafka-2.6.3.jar:2.6.3]
Воспроизводимо со следующими:
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@KafkaListener(id = "so64937593", topics = "so64937593")
public void listen(String in) {
LocalDateTime now = LocalDateTime.now();
System.out.println(in " at " now);
throw new RuntimeException("Thrown at " now);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
SeekToCurrentErrorHandler eh = new SeekToCurrentErrorHandler((rec,ex) -> System.out.println("I am the dlq"), new ExponentialBackOff());
factory.setErrorHandler(eh);
return factory;
}
@Bean
public NewTopic topic() {
return new NewTopic("so64937593", 1, (short) 1);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
IntStream.range(0, 10).forEach(i -> template.send("so64937593", "foo" i));
};
}
}
Я нашел следующее, чтобы дать мне запись в журнал сразу после:
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setListeners(new RetryListener[]{new RetryListener() {
@Override
public <T, E extends Throwable> boolean open(RetryContext context, RetryCallback<T, E> callback) {
return true;
}
@Override
public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {
}
@Override
public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {
System.out.println(LocalDateTime.now() "Oops happened at " context);
}
}});
retryTemplate.setRetryPolicy(new NeverRetryPolicy());
factory.setRetryTemplate(retryTemplate);
Итак, я добавляю RetryTemplate с политикой «Neverr Try», чтобы я мог использовать регистратор.
, где теперь у меня есть немедленная обратная связь о том, что произошла ошибка.
foo1 at 2020-11-20T23:05:57.769425100
2020-11-20T23:05:57.769425100Oops happened at [RetryContext: count=1, lastException=org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void de.roppelt.Application.listen(java.lang.String)' threw exception; nested exception is java.lang.RuntimeException: Thrown at 2020-11-20T23:05:57.769425100, exhausted=false]
2020-11-20 23:06:07.897 INFO 22608 --- [ kgh1235-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-kgh1235-1, groupId=kgh1235] Seeking to offset 21 for partition kgh1235-0
2020-11-20 23:06:07.897 ERROR 22608 --- [ kgh1235-0-C-1] essageListenerContainer$ListenerConsumer : Error handler threw an exception
Я думаю, что это ошибка, но я хотел отложить и опубликовать здесь, прежде чем отправлять вопрос на github.
Ответ №1:
Журнал записывается контейнером после завершения работы обработчика ошибок (у нас нет выбора по этому поводу).
Однако вы можете подавить эти журналы, изменив уровень журнала на SeekToCurrentErrorHandler
. Он устанавливает уровень для исключения, и контейнер будет регистрировать его на этом уровне.
/**
* Set the level at which the exception thrown by this handler is logged.
* @param logLevel the level (default ERROR).
*/
public void setLogLevel(KafkaException.Level logLevel) {
Assert.notNull(logLevel, "'logLevel' cannot be null");
this.logLevel = logLevel;
}
Это доступно с версии 2.5.
https://docs.spring.io/spring-kafka/docs/current/reference/html/#error-handlers
Как правило, обработчики ошибок, предоставляемые фреймворком, выдают исключение, когда ошибка не «обрабатывается» (например, после выполнения операции поиска). По умолчанию такие исключения регистрируются контейнером на уровне ОШИБОК. Начиная с версии 2.5, все обработчики ошибок framework расширяются
KafkaExceptionLogLevelAware
, что позволяет вам контролировать уровень, на котором регистрируются эти исключения.
Если вы хотите зарегистрировать первопричину (исходное исключение) до отката, вы можете подклассировать обработчик ошибок и войти в журнал перед вызовом super.handle(...)
.
Если вы хотите, чтобы фреймворк регистрировал ошибку, прежде чем отступать, не стесняйтесь открывать проблему GitHub.
Комментарии:
1. Спасибо, это все объясняет. Работает так, как задумано, но у меня есть несколько вариантов.
2. Я отправлю проблему.
Ответ №2:
Основываясь на ansere Гэри, я придумал следующее.
Он сразу регистрирует исключение с некоторым контекстом (попытка, тема, смещение). Что несколько похоже на то, что предлагает LoggingListener в RetryTemplate .
public class LoggingSeekToCurrentErrorHandler extends SeekToCurrentErrorHandler {
public LoggingSeekToCurrentErrorHandler(BiConsumer<ConsumerRecord<?, ?>, Exception> handler, BackOff backOff) {
super(handler, backOff);
}
@Override
public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
ConsumerRecord<?, ?> record = records.get(0);
TopicPartitionOffset recordPosition = new TopicPartitionOffset(record.topic(), record.partition(), record.offset());
String delivery = container.getContainerProperties().isDeliveryAttemptHeader() ? "" deliveryAttempt(recordPosition) : "NA";
log.error("Listener threw an exception, attempt {}. On partition {} at offset {}. Record timestamp {}", delivery, recordPosition.getTopicPartition(), recordPosition.getOffset(), record.timestamp(), thrownException);
super.handle(thrownException, records, consumer, container);
}
}
Добавлено на завод:
factory.getContainerProperties().setDeliveryAttemptHeader(true); // optional, see https://docs.spring.io/spring-kafka/reference/html/#delivery-header
errorHandler.setLogLevel(KafkaException.Level.DEBUG); // since our handler already logs. Prevent double logging of stacktrace
Результирующий образец:
@Slf4j
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@KafkaListener(id = "so64937593", topics = "so64937593")
public void listen(String in) {
log.info("Processing {}", in);
throw new RuntimeException("Thrown at " LocalDateTime.now());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
factory.getContainerProperties().setDeliveryAttemptHeader(true); // optional, see https://docs.spring.io/spring-kafka/reference/html/#delivery-header
SeekToCurrentErrorHandler eh = new LoggingSeekToCurrentErrorHandler((rec,ex) -> log.error("I am the dlq"), new FixedBackOff(10_000, Long.MAX_VALUE));
eh.setLogLevel(KafkaException.Level.DEBUG); // since our handler already logs. Prevent double logging of stacktrace
factory.setErrorHandler(eh);
return factory;
}
@Bean
public NewTopic topic() {
return new NewTopic("so64937593", 1, (short) 1);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
IntStream.range(0, 10).forEach(i -> template.send("so64937593", "foo" i));
};
}
public static class LoggingSeekToCurrentErrorHandler extends SeekToCurrentErrorHandler {
public LoggingSeekToCurrentErrorHandler(BiConsumer<ConsumerRecord<?, ?>, Exception> handler, BackOff backOff) {
super(handler, backOff);
}
@Override
public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
ConsumerRecord<?, ?> record = records.get(0);
TopicPartitionOffset recordPosition = new TopicPartitionOffset(record.topic(), record.partition(), record.offset());
String delivery = container.getContainerProperties().isDeliveryAttemptHeader() ? "" deliveryAttempt(recordPosition) : "NA";
log.error("Listener threw an exception, attempt {}. On partition {} at offset {}. Record timestamp {}", delivery, recordPosition.getTopicPartition(), recordPosition.getOffset(), record.timestamp(), thrownException);
super.handle(thrownException, records, consumer, container);
}
}
}
Комментарии:
1. К вашему сведению, вы можете получить попытку доставки без включения заголовка; просто позвоните
getDeliveryAttempt(recordPosition)
. Конечно, это спорно, если вы хотите предоставить приложению попытку доставки (через заголовок).2. О, приятно знать, я просто предположил, что он не будет работать или всегда будет равен 1, если он не был включен в заводских настройках.
3. Нет; этот флаг просто сообщает контейнеру вызвать этот метод, чтобы получить попытку доставки и добавить его в качестве заголовка к записи.