возможно ли иметь как обработчики ошибок прослушивателя, так и контейнера

#spring-kafka

#spring-kafka

Вопрос:

Я создаю общую конфигурацию spring-kafka для команд для использования в их проектах.

Я хотел бы определить общий пользовательский обработчик ошибок на уровне контейнера и разрешить проекту определять обработчик ошибок прослушивателя для каждого прослушивателя. Все, что не обрабатывается обработчиком ошибок прослушивателя, должно возвращаться в контейнер.

Из того, что я тестировал до сих пор, это либо одно, либо другое. есть ли способ заставить их работать вместе?

Имело бы смысл иметь цепочку обработчиков на уровне контейнера и позволять проектам добавлять обработчики ошибок в цепочку?

Ответ №1:

Ничто не мешает вам настроить оба обработчика ошибок…

 @SpringBootApplication
public class So55001718Application {

    public static void main(String[] args) {
        SpringApplication.run(So55001718Application.class, args);
    }

    @KafkaListener(id = "so55001718", topics = "so55001718", errorHandler = "listenerEH")
    public void listen(String in) {
        System.out.println(in);
        if ("bad1".equals(in)) {
            throw new IllegalStateException();
        }
        else if("bad2".equals(in)) {
            throw new IllegalArgumentException();
        }
    }

    @Bean
    public KafkaListenerErrorHandler listenerEH() {
        return (m, t) -> {
            if (t.getCause() instanceof IllegalStateException) {
                System.out.println(
                        t.getClass().getSimpleName()   " bad record "   m.getPayload()   " handled by listener EH");
                return null;
            }
            else {
                throw (t);
            }
        };
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
            ConsumerFactory<Object, Object> kafkaConsumerFactory) {

        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        configurer.configure(factory, kafkaConsumerFactory);
        factory.setErrorHandler((t, r) -> {
            System.out.println(t.getClass().getSimpleName()   " bad record "   r.value()   " handled by container EH");
        });
        return factory;
    }

    @Bean
    public NewTopic topic() {
        return new NewTopic("so55001718", 1, (short) 1);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            template.send("so55001718", "good");
            template.send("so55001718", "bad1");
            template.send("so55001718", "bad2");
        };
    }
}
  

и

 good
bad1
ListenerExecutionFailedException bad record bad1 handled by listener EH
bad2
ListenerExecutionFailedException bad record bad2 handled by container EH
  

Вы можете создать простую оболочку для обертывания нескольких обработчиков ошибок; не стесняйтесь открывать проблему на GitHub (вклад приветствуется).

Комментарии:

1. Я думаю, это то, что я пропустил при тестировании. Я не переназначал исключение в обработчике прослушивателя.

2. Спасибо. Именно то, что я ищу: D