#apache-camel
#apache-camel
Вопрос:
Мой верблюжий маршрут использует и отправляет из / в JMS транзакционным способом. Наше требование — отбросить вредоносное сообщение, если оно не обрабатывается несколько раз. Я знаю, что гораздо лучшим вариантом было бы переместить сообщение в очередь мертвых писем, но для целей этого упражнения его отбрасывание просто хорошо.
Ниже приведено определение маршрута для имитации проблемы:
package com.my.comp.playground;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.springframework.stereotype.Component;
@Component
public class MyRouteBuilder extends RouteBuilder {
@Override
public void configure() {
onException(Exception.class)
.process(new Processor() {
private int failureCounter = 0;
@Override
public void process(Exchange exchange) {
exchange.getIn().setHeader("failureCounter", failureCounter);
}
})
.log("failureCounter = ${header.failureCounter}")
//.handled(true);
.handled(header("failureCounter").isGreaterThan(3));
from("jms:test.queue")
.routeId("test-route")
.transacted()
.process(exchange -> {
throw new RuntimeException("No good Pal!");
})
.to("mock:discard");
}
}
Итак, что я пытаюсь сделать, это сохранить счетчик сбоев, и если этот счетчик больше определенного числа, отметьте исключение как обработанное и зафиксируйте транзакцию.
Обратите внимание на две строки кода в конце обработки исключений:
//.handled(true);
.handled(header("failureCounter").isGreaterThan(3));
Когда я запускаю свой маршрут с header("failureCounter").isGreaterThan(3)
обработанным условием, сообщение откатывается снова и снова навсегда, и я вижу в журналах корректное увеличение failureCounter:
...
[mer[test.queue]] test-route : failureCounter = 402
[mer[test.queue]] o.a.c.p.e.DefaultErrorHandler : Failed delivery for (MessageId: ...
...
[mer[test.queue]] test-route : failureCounter = 403
...
[mer[test.queue]] test-route : failureCounter = 404
...
Однако, когда я запускаю маршрут с true
обработанным условием, транзакция сразу же фиксируется после первого сбоя, как показано ниже:
[mer[test.queue]] test-route : failureCounter = 1
[mer[test.queue]] o.a.c.s.spi.TransactionErrorHandler : Transaction commit (0x52b2f795) redelivered(true)
Итак, мой вопрос таков: я делаю что-то неправильно или мое понимание того, как использовать handled
exception, неверно? Если да, то каков был бы правильный путь?
Ответ №1:
Я понятия не имею, является ли это преднамеренным или ошибкой.
Когда я отлаживаю ваш случай, я вижу, что предикат in handled()
вычисляется на основе верблюжьего обмена.
Однако ваш failureCounter
заголовок отсутствует в обмене. Поэтому выражение header("failureCounter")
вычисляется null
как, и ваш предикат всегда false
.
В коротком тесте я увидел, что заголовки, установленные перед исключением, присутствуют, но заголовки, установленные после исключения (т. Е. Установленные Внутри обработчика ошибок), Отсутствуют в Exchange, который используется для оценки предиката.
Комментарии:
1. Я не думаю, что ваш заголовок failureCounter, который отсутствует в обмене , является истинным. Как вы можете видеть из журнала, увеличивающийся счетчик сбоев записывается в журналы. И этот журнал находится непосредственно перед оценкой обработанного предиката.
2. Да, это еще один странный момент, который я действительно не понимаю. В этой строке журнала корректно отображается увеличенное значение
failureCounter
. Но если вы скопируете эту строку журнала перед своим процессором, он регистрирует только текст без значения заголовка. Он ведет себя так, как если бы приращение сделало заголовок «видимым».3. Я отладил вниз
BinaryPredicateSupport.matchesReturningFailureMessage
. Происходит вычисление предикатаhandled
. Левая сторона (header("failureCounter")
) принимает значение null. Кстати, по крайней мере, в моем случае я использую Camel 2.x для этого теста.4. Спасибо. Ваш комментарий указал мне на то, чтобы выяснить первопричину, и как только я понял, что происходит, решение было простым. Опубликует это как отдельный ответ на случай, если у кого-то еще возникнет аналогичная проблема.
Ответ №2:
Комментарии Бурки были хорошими наблюдениями, которые помогли мне определить основную причину.
Несмотря на то, что camel DSL предполагает .handled(header("failureCounter").isGreaterThan(3))
, что проверка произойдет после запуска процессора увеличения счетчика сбоев и вывода журнала, на самом деле .handled
это первое, что нужно оценить в этой onException(...)
ветке. Это кажется немного вводящим в заблуждение, и я должен признать, что я немного разочарован таким подходом. Однако это должно быть причиной.
Учитывая, что camel создаст новое Exchange
для каждой доставки сообщения, неудивительно, что это приведет к откату навсегда.
Для решения этой проблемы есть два решения, которые сразу пришли мне в голову.
Решение 1. Оно основано на том факте, что посредник сообщений установит заголовок JMSRedelivered
сообщения, поэтому можно увеличить счетчик сообщений на основе этого. Таким образом, новая конфигурация маршрута будет выглядеть следующим образом:
@Override
public void configure() {
onException(Exception.class)
.handled(header("failureCounter").isGreaterThan(3));
from("jms:test.queue")
.routeId("test-route")
.transacted()
.process(new Processor() {
private int failureCounter = 0;
@Override
public void process(Exchange exchange) throws Exception {
if (exchange.getIn().getHeader("JMSRedelivered", Boolean.class)) {
exchange.getIn().setHeader("failureCounter", failureCounter);
}
}
})
.log("failureCounter = ${header.failureCounter}")
.process(exchange -> {
throw new RuntimeException("No good Pal!");
})
.to("mock:discard");
}
После применения этого изменения сообщение будет удалено после более чем трех последовательных сбоев в доставке сообщения, как показано в журналах ниже:
46849 --- [mer[test.queue]] test-route : failureCounter = 4
46849 --- [mer[test.queue]] o.a.c.s.spi.TransactionErrorHandler : Transaction commit (0x31b273b2) redelivered(true) for (MessageId: ID:414d5120514d31202020202020202020c083da5f02bd1d22 on ExchangeId: ID-C02XN27DJG5H-1608604365426-0-4))
Решение 2: это еще проще, но в данном случае это специфично для брокера сообщений IBM MQ.
@Override
public void configure() {
onException(Exception.class)
.log("failureCounter = ${header.JMSXDeliveryCount}")
.handled(header("JMSXDeliveryCount").isGreaterThan(3));
from("jms:test.queue")
.routeId("test-route")
.transacted()
.process(exchange -> {
throw new RuntimeException("No good Pal!");
})
.to("mock:discard");
}
И журналы снова покажут, что сообщение доставляется четыре раза, прежде чем сдаться.