#spring-kafka
#spring-kafka
Вопрос:
Я создаю общую конфигурацию spring-kafka для команд для использования в их проектах.
Я хотел бы определить общий пользовательский обработчик ошибок на уровне контейнера и разрешить проекту определять обработчик ошибок прослушивателя для каждого прослушивателя. Все, что не обрабатывается обработчиком ошибок прослушивателя, должно возвращаться в контейнер.
Из того, что я тестировал до сих пор, это либо одно, либо другое. есть ли способ заставить их работать вместе?
Имело бы смысл иметь цепочку обработчиков на уровне контейнера и позволять проектам добавлять обработчики ошибок в цепочку?
Ответ №1:
Ничто не мешает вам настроить оба обработчика ошибок…
@SpringBootApplication
public class So55001718Application {
public static void main(String[] args) {
SpringApplication.run(So55001718Application.class, args);
}
@KafkaListener(id = "so55001718", topics = "so55001718", errorHandler = "listenerEH")
public void listen(String in) {
System.out.println(in);
if ("bad1".equals(in)) {
throw new IllegalStateException();
}
else if("bad2".equals(in)) {
throw new IllegalArgumentException();
}
}
@Bean
public KafkaListenerErrorHandler listenerEH() {
return (m, t) -> {
if (t.getCause() instanceof IllegalStateException) {
System.out.println(
t.getClass().getSimpleName() " bad record " m.getPayload() " handled by listener EH");
return null;
}
else {
throw (t);
}
};
}
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
factory.setErrorHandler((t, r) -> {
System.out.println(t.getClass().getSimpleName() " bad record " r.value() " handled by container EH");
});
return factory;
}
@Bean
public NewTopic topic() {
return new NewTopic("so55001718", 1, (short) 1);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
template.send("so55001718", "good");
template.send("so55001718", "bad1");
template.send("so55001718", "bad2");
};
}
}
и
good
bad1
ListenerExecutionFailedException bad record bad1 handled by listener EH
bad2
ListenerExecutionFailedException bad record bad2 handled by container EH
Вы можете создать простую оболочку для обертывания нескольких обработчиков ошибок; не стесняйтесь открывать проблему на GitHub (вклад приветствуется).
Комментарии:
1. Я думаю, это то, что я пропустил при тестировании. Я не переназначал исключение в обработчике прослушивателя.
2. Спасибо. Именно то, что я ищу: D