#jboss #jms #apache-camel #multicast
#jboss #jms #apache-camel #многоадресная рассылка
Вопрос:
У меня есть приложение, состоящее из множества модулей, развернутых в нескольких экземплярах JBoss. Эти модули передают свои номера версий друг другу через JMS.
Это работает следующим образом. Каждый модуль периодически транслирует свои версии следующим образом:
camelTemplate.asyncSendBody(moduleVersionsOutputChannel, new ArrayList<>(moduleVersions));
Вот определение moduleVersionsOutputChannel
:
<endpoint id="moduleVersionsOutputChannel" uri="direct:module.versions.output.channel"/>
Затем есть маршрут, который принимает сообщения от moduleVersionsOutputChannel
(см. source
Ниже) и отправляет сообщения в темы:
from(source).multicast()
.parallelProcessing().timeout(moduleVersionsMonitoringConfig.getMulticastTimeout())
.to(moduleVersionsMonitoringConfig.getChannels())
.id(moduleVersionsService.getCurrentModuleId() "-outbound");
Вначале все работает нормально, но через некоторое время (часы или дни) потоки, выполняющие многоадресную рассылку, блокируются. Я наблюдаю блоки в 2 разных точках.
Первый блок происходит в потоке, где asyncSendBody
вызывается:
"moduleVersionsBroadcastScheduler-1@57542" prio=5 tid=0x423 nid=NA waiting
java.lang.Thread.State: WAITING
at sun.misc.Unsafe.park(Unsafe.java:-1)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:994)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1303)
at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:236)
at org.apache.camel.processor.MulticastProcessor.doProcessParallel(MulticastProcessor.java:328)
at org.apache.camel.processor.MulticastProcessor.process(MulticastProcessor.java:214)
at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)
at org.apache.camel.processor.interceptor.TraceInterceptor.process(TraceInterceptor.java:163)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
at org.apache.camel.component.direct.DirectProducer.process(DirectProducer.java:51)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
at org.apache.camel.processor.UnitOfWorkProducer.process(UnitOfWorkProducer.java:73)
at org.apache.camel.impl.ProducerCache$2.doInProducer(ProducerCache.java:378)
at org.apache.camel.impl.ProducerCache$2.doInProducer(ProducerCache.java:346)
at org.apache.camel.impl.ProducerCache.doInProducer(ProducerCache.java:242)
at org.apache.camel.impl.ProducerCache.sendExchange(ProducerCache.java:346)
at org.apache.camel.impl.ProducerCache.send(ProducerCache.java:184)
at org.apache.camel.impl.DefaultProducerTemplate.send(DefaultProducerTemplate.java:124)
at org.apache.camel.impl.DefaultProducerTemplate.sendBody(DefaultProducerTemplate.java:137)
at org.apache.camel.impl.DefaultProducerTemplate$14.call(DefaultProducerTemplate.java:621)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy.rejectedExecution(ThreadPoolExecutor.java:2025)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:821)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1372)
at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:132)
at org.apache.camel.impl.DefaultProducerTemplate.asyncSendBody(DefaultProducerTemplate.java:626)
at ...
В другой раз выполнение блокируется на более позднем этапе, когда оно фактически пытается отправить сообщение в разделы JMS:
"Camel (moduleVersionsContext) thread #1044 - Multicast@52031" daemon prio=5 tid=0xa16 nid=NA waiting
java.lang.Thread.State: WAITING
at sun.misc.Unsafe.park(Unsafe.java:-1)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:994)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1303)
at java.util.concurrent.Semaphore.acquire(Semaphore.java:472)
at org.hornetq.core.client.impl.ClientProducerCreditsImpl.acquireCredits(ClientProducerCreditsImpl.java:90)
at org.hornetq.core.client.impl.ClientProducerImpl.sendRegularMessage(ClientProducerImpl.java:307)
at org.hornetq.core.client.impl.ClientProducerImpl.doSend(ClientProducerImpl.java:288)
at org.hornetq.core.client.impl.ClientProducerImpl.send(ClientProducerImpl.java:140)
at org.hornetq.jms.client.HornetQMessageProducer.doSend(HornetQMessageProducer.java:438)
at org.hornetq.jms.client.HornetQMessageProducer.send(HornetQMessageProducer.java:205)
at org.springframework.jms.core.JmsTemplate.doSend(JmsTemplate.java:589)
at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.doSend(JmsConfiguration.java:336)
at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.doSendToDestination(JmsConfiguration.java:275)
at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.access$100(JmsConfiguration.java:217)
at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate$1.doInJms(JmsConfiguration.java:231)
at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:466)
at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.send(JmsConfiguration.java:228)
at org.apache.camel.component.jms.JmsProducer.doSend(JmsProducer.java:431)
at org.apache.camel.component.jms.JmsProducer.processInOnly(JmsProducer.java:385)
at org.apache.camel.component.jms.JmsProducer.process(JmsProducer.java:153)
at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:110)
at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)
at org.apache.camel.processor.interceptor.TraceInterceptor.process(TraceInterceptor.java:163)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:105)
at org.apache.camel.processor.MulticastProcessor.doProcessParallel(MulticastProcessor.java:712)
at org.apache.camel.processor.MulticastProcessor.access$200(MulticastProcessor.java:83)
at org.apache.camel.processor.MulticastProcessor$1.call(MulticastProcessor.java:293)
at org.apache.camel.processor.MulticastProcessor$1.call(MulticastProcessor.java:278)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
Можете ли вы сказать мне, есть ли у меня ошибка в конфигурации Camel, или это известная проблема с Camel, и есть какой-то обходной путь?
Моя среда:
- Apache Camel 2.12.3
- JBoss 6.1 EAP
- OpenJDK 1.7.0_55
- Ubuntu Linux 12.04 LTS
Комментарии:
1. Вы нашли решение этой проблемы?
Ответ №1:
У меня была похожая проблема. Попробуйте настроить конечную точку производителя cxf на синхронную. Это решило мою проблему
Комментарии:
1. Спасибо за ваше предложение, однако я не использую здесь никаких конечных точек cxf.