Код после разделителя со стратегией агрегирования не выполняется, если было обработано исключение во внутреннем маршруте (Apache Camel)

#apache-camel

#apache-camel

Вопрос:

Я столкнулся с поведением, которое я не могу понять. Эта проблема возникает, когда выполняется разделение с помощью AggregationStrategy, и во время одной из итераций возникает исключение. Исключение возникает внутри разделителя на другом маршруте (прямая конечная точка, которая вызывается для каждой итерации). Похоже, что выполнение маршрута останавливается сразу после разделения.

Вот пример кода.

Это маршрут, который создает один отчет для каждого клиента и собирает имена файлов для внутренней статистики.

 
    @Component
    @RequiredArgsConstructor
    @FieldDefaults(level = PRIVATE, makeFinal = true)
    public class ReportRouteBuilder extends RouteBuilder {
    
      ClientRepository clientRepository;
    
      @Override
      public void configure() throws Exception {
        errorHandler(deadLetterChannel("direct:handleError")); //handles an error, adds error message to internal error collector for statistic and writes log
    
        from("direct:generateReports")
        
            .setProperty("reportTask", body()) //at this point there is in the body an object of type ReportTask, containig all data required for building report
    
            .bean(clientRepository, "getAllClients") // Body is a List<Client>
            
            .split(body())
              .aggregationStrategy(new FileNamesListAggregationStrategy())
              .to("direct:generateReportForClient") // creates report which is saved in the file system. uses the same error handler
            .end()
            
            //when an exception occurs during split then code after splitter is not executed
    
            .log("Finished generating reports. Files created ${body}"); // Body has to be List<String> with file names.
      }
    }

  

Стратегия агрегации довольно проста — она просто извлекает имя файла. Если заголовок отсутствует, он возвращает NULL.

 
    public class FileNamesListAggregationStrategy extends AbstractListAggregationStrategy<String> {
    
      @Override
      public String getValue(Exchange exchange) {
        Message inMessage = exchange.getIn();
        return inMessage.getHeader(Exchange.FILE_NAME, String.class);
      }
    }

  

Когда после разделения все идет гладко, в основном списке отображаются все имена файлов. Но когда в маршруте «direct: generateReportForClient» возникло какое-то исключение (я добавил симуляцию ошибки для одного клиента), чем в агрегированном теле просто на одно имя файла меньше — все в порядке (все было агрегировано правильно).

НО сразу после разделения выполнение маршрута останавливается, и результат, который находится в теле на данный момент (список с именами файлов), возвращается клиенту (FluentProducer), который ожидает ReportTask в качестве тела ответа.

и он пытается преобразовать список значений (агрегированный результат) в ReportTask и вызывает org.apache.camel.NoTypeConversionAvailableException: No type converter available to convert from type

Почему маршрут прерывается после разделения? Все ошибки были обработаны, и агрегация завершена правильно.

PS Я прочитал книгу Camel In Action и документацию о разделителе, но я не нашел ответа.

Проект PPS выполняется на Spring Boot 2.3.1 и Camel 3.3.0

ОБНОВЛЕНИЕ Этот маршрут запускается FluentProducerTemplate

     ReportTask processedReportTask = producer.to("direct:generateReports")
            .withBody(reportTask)
            .request(ReportTask.class);

  

Ответ №1:

Проблема заключается в обработчике ошибок пользовательской стратегии агрегирования в разделении.

Из книги действий Camel (5.3.5):

ПРЕДУПРЕЖДЕНИЕ При использовании пользовательской стратегии агрегации с разделителем важно знать, что вы несете ответственность за обработку исключений. Если вы не распространяете исключение обратно, разделитель предположит, что вы обработали исключение, и проигнорирует его.

В вашем коде вы используете стратегию агрегации, расширенную от AbstractListAggregationStrategy . Давайте рассмотрим aggregate метод в AbstractListAggregationStrategy :

 @Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
    List<V> list;

    if (oldExchange == null) {
        list = getList(newExchange);
    } else {
        list = getList(oldExchange);
    }

    if (newExchange != null) {
        V value = getValue(newExchange);
        if (value != null) {
            list.add(value);
        }
    }

    return oldExchange != null ? oldExchange : newExchange;
}

  

Если первый обмен обработан обработчиком ошибок, мы будем иметь в результате exchange ( newExchange ) количество свойств, заданных обработчиком ошибок ( Exchange.EXCEPTION_CAUGHT, Exchange.FAILURE_ENDPOINT, Exchange.ERRORHANDLER_HANDLED and Exchange.FAILURE_HANDLED ) и exchange.errorHandlerHandled=true . Методы getErrorHandlerHandled()/setErrorHandlerHandled(Boolean errorHandlerHandled) доступны в ExtendedExchange интерфейсе.

В этом случае ваше разделение завершается обменом с errorHandlerHandled=true и прерывает маршрут.

Причина описана в руководстве по исключению camel

Если обработано значение true, то генерируемое исключение будет обработано, и Camel не продолжит маршрутизацию по исходному маршруту, а прервется.

Чтобы предотвратить такое поведение, вы можете привести свой exchange к ExtendedExchange и установить errorHandlerHandled=false в aggregate методе стратегии агрегирования. И ваш маршрут не будет прерван, а будет продолжен.

   @Override
  public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
    Exchange aggregatedExchange = super.aggregate(oldExchange, newExchange);
    ((ExtendedExchange) aggregatedExchange).setErrorHandlerHandled(false);
    return aggregatedExchange;
  }
  

Сложная ситуация заключается в том, что если у вас есть exchange, обработанный обработчиком ошибок как не первый в вашей стратегии агрегирования, вы не столкнетесь с какой-либо проблемой. Потому что camel будет использовать первый обмен (без errorHandlerHandled=true ) в качестве основы для агрегации.

Комментарии:

1. Поскольку я столкнулся с этой проблемой с Camel версии 2, я решил поделиться тем, как решить эту проблему и с Camel версии 2. Я нашел полезные фрагменты кода в руководстве по обновлению Apache Camel 3.x для версии 2 исправление выглядит так: oldExchange.setProperty(Exchange.ERRORHANDLER_HANDLED, false); А для версии 3 лучше использовать adapt() вместо приведения: oldExchange.adapt(ExtendedExchange.class) .setErrorHandlerHandled(false);

2. Еще одно обновление. По крайней мере, в Camel 3.10.0 (возможно, немного раньше) структура свойств exchange была изменена. Теперь свойства «system» (означает «camel») хранятся отдельно от свойств «users». Вам нужно добавить код: aggregatedExchange.setProperty(FAILURE_HANDLED, false);