#scala #activemq
Вопрос:
Я провел много исследований и прочитал, и я просто хочу убедиться, что понимаю, как политика повторной доставки ActiveMQ работает.
Следующий код-Scala
Я создал соединение следующим образом:
val connFactory: ActiveMQConnectionFactory = new ActiveMQConnectionFactory(s"tcp://$ACTIVE_MQ_HOST:61616")
connFactory.setTrustAllPackages(true)
connFactory.setAlwaysSyncSend(true)
Я создал соединение
val connection: ActiveMQConnection = connFactory.createConnection().asInstanceOf[ActiveMQConnection]
Я создал политику повторной доставки
val policy: RedeliveryPolicy = activeMQConnection.getRedeliveryPolicy()
policy.setInitialRedeliveryDelay(0)
policy.setRedeliveryDelay(1000);
policy.setUseExponentialBackOff(false)
policy.setMaximumRedeliveries(2)
Я установил политику повторной доставки для соединения и запустил соединение
activeMQConnection.setRedeliveryPolicy(policy)
println(s"Redelivery connection policy: ${activeMQConnection.getRedeliveryPolicy}")
activeMQConnection.start()
Я создал сеанс
val refCountQueueSession: Session = activeMQConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE)
Я создал производителя и потребителя в очереди
val reference_count_queue: Queue = refCountQueueSession.createQueue("ReferenceCountQueue?consumer.exclusive=true")
val reference_count_queue_producer: MessageProducer = refCountQueueSession.createProducer(reference_count_queue)
val reference_count_consumer: MessageConsumer = refCountQueueSession.createConsumer(reference_count_queue)
reference_count_consumer.setMessageListener(new GroupConsumerMessageListener("Group Reference Count Consumer"))
Параметр Groupconsumessagelistener, который я реализовал, создает исключение ранее message.acknowledge()
Сообщение не удаляется из очереди, как предполагалось, из-за исключения, возникшего до подтверждения, однако политика повторной доставки не повторяется, я не вижу в журналах моего потребителя, что он снова пытается получить доступ из очереди.
Так я что-то упускаю? Должен ли я поддерживать бесконечный цикл там, где я это делаю
session.recover()
?
До сих пор это единственный способ, которым я смог реализовать повторную доставку неподтвержденных сообщений, но мне кажется, что это неправильно.
Комментарии:
1. Какую версию ActiveMQ вы используете?