Разделитель / агрегатор с возгоранием / забвением и таймаутом

#apache-camel

#apache-camel

Вопрос:

У нас есть процесс разделения, который отправляет сообщения в разные очереди. Существует другой процесс, который собирает и объединяет эти сообщения для дальнейшей обработки.

Мы хотим иметь тайм-аут между моментом разделения и агрегированием. Тайм-аут агрегации IIUC начинается с первого сообщения и сбрасывается после каждого агрегированного сообщения (он основан на интервале, а не для полного сообщения).

Какое лучшее решение для решения этой проблемы?

Комментарии:

1. Итак, вы хотите, чтобы агрегатор предполагал, что конкретная агрегация завершена, если с момента разделения сообщения прошло NNNN секунд? Это правильно?

2. Да, общее время разделения и агрегирования «messagebundle» может занимать не более NNNN секунд.

3. О, тогда я совершенно неправильно понял вопрос — я думал, вы хотите отложить агрегацию на некоторое время — я был совершенно не в курсе.

4. Самое близкое, о чем я мог подумать, это использовать completionInterval вместо completionTimeout и, возможно, посмотреть, можно ли его объединить с предикатом завершения; Я попробую завтра, если кто-то еще не поймет это за это время.

5. Мы подумывали о том, чтобы отправить какое-то фиктивное уведомление-сообщение в очередь агрегатора, которое начнет отсчет времени ожидания. Но все же остается проблема сброса таймаута с каждым новым сообщением.

Ответ №1:

Редактировать

Вот лучшее, что я смог придумать, хотя это немного взлом. Сначала вы сохраняете временную метку в качестве заголовка сообщения и публикуете ее в очереди вместе с телом:

 from("somewhere")
    .split(body())
    .process(e -> e.getIn().setHeader("aggregation_timeout", 
        ZonedDateTime.now().plusSeconds(COMPLETION_TIMEOUT)))
    .to("aggregation-route-uri");
  

Затем, при потреблении и агрегировании, вы используете пользовательскую стратегию агрегирования, которая сохранит aggregation_timeout из первого сообщения в текущей группе, а затем использует a, completionPredicate который считывает это значение, чтобы проверить, истек ли тайм-аут (альтернативно, если вы агрегируете таким образом, чтобы порядок сообщений оставался неизменным, вы могли бы простопрочитайте заголовок из первого сообщения). Используйте short completionTimeout в качестве меры предосторожности для случаев, когда интервал между двумя сообщениями большой:

 from("aggregation-route-uri")
    .aggregate(bySomething())
    .aggregationStrategy((oldExchange, newExchange) -> {
      // read aggregation_timeout header from first message 
      // and set it as property in grouped exchange
      // perform aggregation
    })
    .completionTimeout(1000) // intentionally low value, here as a safeguard
    .completionPredicate(e -> {
      // complete once the timeout has been reached
      return e.getProperty("aggregation_timeout", ZonedDateTime.class)
              .isAfter(ZonedDateTime.now());
    })
    .process(e -> // do something with aggregates);
  

Комментарии:

1. Я забыл упомянуть, что у разделителя и агрегатора есть свой собственный CamelContext, потому что мы ожидаем огромное количество сообщений и длительных задач от внешних систем. Информация об агрегации сохраняется. Я не думаю, что предлагаемое решение будет работать с разделенными текстами CamelContexts, верно?

2. Я не понимаю, почему это не должно работать, это не зависит от контекста

3. Все еще существует проблема, если пакет сообщений содержит только одно сообщение. Такое сообщение также должно обрабатываться (обработка ошибок) при возникновении тайм-аута. По крайней мере, это подтверждает, что пока нет готового шаблона.

4. Разве completionTimeout не обрабатывает сценарий с одним сообщением? Если пакет содержал только одно сообщение, оно будет объединено через секунду

5. Разделенное сообщение будет помещено в очередь, и от третьей стороны зависит, насколько быстро она сможет обработать это сообщение и ответить в нашей очереди-агрегаторе. Если это занимает слишком много времени, messagebundle должен выйти из строя.