Политика повторной доставки ActiveMQ работает не так, как ожидалось

#scala #activemq

Вопрос:

Я провел много исследований и прочитал, и я просто хочу убедиться, что понимаю, как политика повторной доставки ActiveMQ работает.

Следующий код-Scala

Я создал соединение следующим образом:

 val connFactory: ActiveMQConnectionFactory = new ActiveMQConnectionFactory(s"tcp://$ACTIVE_MQ_HOST:61616")
connFactory.setTrustAllPackages(true)
connFactory.setAlwaysSyncSend(true)
 

Я создал соединение

 val connection: ActiveMQConnection = connFactory.createConnection().asInstanceOf[ActiveMQConnection]
 

Я создал политику повторной доставки

 val policy: RedeliveryPolicy = activeMQConnection.getRedeliveryPolicy()
policy.setInitialRedeliveryDelay(0)
policy.setRedeliveryDelay(1000);
policy.setUseExponentialBackOff(false)
policy.setMaximumRedeliveries(2)
 

Я установил политику повторной доставки для соединения и запустил соединение

 activeMQConnection.setRedeliveryPolicy(policy)
println(s"Redelivery connection policy: ${activeMQConnection.getRedeliveryPolicy}")
activeMQConnection.start()
 

Я создал сеанс

 val refCountQueueSession: Session = activeMQConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE)
 

Я создал производителя и потребителя в очереди

 val reference_count_queue: Queue = refCountQueueSession.createQueue("ReferenceCountQueue?consumer.exclusive=true")
val reference_count_queue_producer: MessageProducer = refCountQueueSession.createProducer(reference_count_queue)
val reference_count_consumer: MessageConsumer = refCountQueueSession.createConsumer(reference_count_queue)
reference_count_consumer.setMessageListener(new GroupConsumerMessageListener("Group Reference Count Consumer"))
 

Параметр Groupconsumessagelistener, который я реализовал, создает исключение ранее message.acknowledge()

Сообщение не удаляется из очереди, как предполагалось, из-за исключения, возникшего до подтверждения, однако политика повторной доставки не повторяется, я не вижу в журналах моего потребителя, что он снова пытается получить доступ из очереди.

Так я что-то упускаю? Должен ли я поддерживать бесконечный цикл там, где я это делаю

 session.recover()
 

?

До сих пор это единственный способ, которым я смог реализовать повторную доставку неподтвержденных сообщений, но мне кажется, что это неправильно.

Комментарии:

1. Какую версию ActiveMQ вы используете?