#java #spring #apache-kafka #error-handling #spring-kafka
#java #весна #apache-kafka #обработка ошибок #spring-kafka
Вопрос:
Я хочу перехватить ошибки проверки (MethodArgumentNotValidException), вызванные моим @KafkaListener. Для достижения этой цели я создал DefaultKafkaValidationErrorHandler, который реализует KafkaListenerErrorHandler:
@Component
public class DefaultKafkaValidationErrorHandler implements KafkaListenerErrorHandler {
@Override
public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) {
//here is some logic
}
@Override
public final Object handleError(Message<?> message, ListenerExecutionFailedException exception) {
//such as here
}
}
И установите валидатор для регистратора:
@Component
public class KafkaListenerConfig implements KafkaListenerConfigurer {
@Autowired
private LocalValidatorFactoryBean validator;
@Override
public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
registrar.setValidator(this.validator);
}
}
Он отлично работает, пока я передаю этот обработчик своему слушателю напрямую
@KafkaListener(
containerFactory = "factory",
topics = "topic",
groupId = "group.id",
errorHandler = "defaultKafkaValidationErrorHandler")
Мой вопрос в том, есть ли какой-нибудь способ применить мой DefaultKafkaValidationErrorHandler ко всем @KafkaListener в моем проекте, а не напрямую, как упоминалось. Может быть, через ConcurrentKafkaListenerContainerFactory, KafkaListenerEndpointRegistrar или что-то еще
Ответ №1:
Вы можете переопределить C createListenerContainer(KafkaListenerEndpoint endpoint);
метод ConcurrentKafkaListenerContainerFactory
и установить свой глобальный KafkaListenerErrorHandler
для всего одного endpoint
. Ваша фабрика должна быть зарегистрирована под именем KafkaListenerAnnotationBeanPostProcessor.DEFAULT_KAFKA_LISTENER_CONTAINER_FACTORY_BEAN_NAME
компонента, чтобы избежать настройки этой фабрики на каждом отдельном @KafkaListener
.