Spring Kafka: применение KafkaListenerErrorHandler ко всем KafkaListener

#java #spring #apache-kafka #error-handling #spring-kafka

#java #весна #apache-kafka #обработка ошибок #spring-kafka

Вопрос:

Я хочу перехватить ошибки проверки (MethodArgumentNotValidException), вызванные моим @KafkaListener. Для достижения этой цели я создал DefaultKafkaValidationErrorHandler, который реализует KafkaListenerErrorHandler:

 @Component
public class DefaultKafkaValidationErrorHandler implements KafkaListenerErrorHandler {


    @Override
    public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) {
       //here is some logic
    }

    @Override
    public final Object handleError(Message<?> message, ListenerExecutionFailedException exception) {
        //such as here
    }
}
 

И установите валидатор для регистратора:

 @Component
public class KafkaListenerConfig implements KafkaListenerConfigurer {
    @Autowired
    private LocalValidatorFactoryBean validator;

    @Override
    public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
        registrar.setValidator(this.validator);
    }

}
 

Он отлично работает, пока я передаю этот обработчик своему слушателю напрямую

 @KafkaListener(
            containerFactory = "factory",
            topics = "topic",
            groupId = "group.id",
            errorHandler = "defaultKafkaValidationErrorHandler")

 

Мой вопрос в том, есть ли какой-нибудь способ применить мой DefaultKafkaValidationErrorHandler ко всем @KafkaListener в моем проекте, а не напрямую, как упоминалось. Может быть, через ConcurrentKafkaListenerContainerFactory, KafkaListenerEndpointRegistrar или что-то еще

Ответ №1:

Вы можете переопределить C createListenerContainer(KafkaListenerEndpoint endpoint); метод ConcurrentKafkaListenerContainerFactory и установить свой глобальный KafkaListenerErrorHandler для всего одного endpoint . Ваша фабрика должна быть зарегистрирована под именем KafkaListenerAnnotationBeanPostProcessor.DEFAULT_KAFKA_LISTENER_CONTAINER_FACTORY_BEAN_NAME компонента, чтобы избежать настройки этой фабрики на каждом отдельном @KafkaListener .