#spring #mule #ibm-mq #spring-jms #mule-component
#spring #mule #ibm-mq #spring-jms #mule-компонент
Вопрос:
В настоящее время я создаю серверное приложение Mule ESB, которое использует соединитель jms запроса-ответа. Поскольку он используется в среде с высокой степенью параллелизма, мы включили spring jms cache в нашей конфигурации MQ.
<spring:beans>
<mule>
<!-- MQ Factory -->
<spring:bean id="testMsgMqFactoryBean1" name="testMsgMqFactory1" class="com.ibm.mq.jms.MQQueueConnectionFactory">
<spring:property name="channel" value="${test.msg.mq.channel.1}" />
<spring:property name="queueManager" value="${test.msg.mq.queueManager.1}" />
<spring:property name="hostName" value="${test.msg.mq.hostName.1}" />
<spring:property name="port" value="${test.msg.mq.port.1}" />
<spring:property name="transportType" value="${mq.jms.transportType}" />
</spring:bean>
<spring:bean id="testMsgMqFactoryBeanCache1" class="org.springframework.jms.connection.CachingConnectionFactory">
<spring:property name="targetConnectionFactory" ref="testMsgMqFactoryBean1" />
<spring:property name="sessionCacheSize" value="${test.threading.profile.maxThreadsActive}" />
<spring:property name="cacheConsumers" value="false" />
<!-- <spring:property name="cacheProducers" value="false" /> -->
</spring:bean>
<!-- MQ Connector 1 -->
<jms:custom-connector name="testMsgMqConnector.1" class="org.mule.transport.jms.websphere.WebsphereJmsConnector" doc:name="Custom JMS">
<spring:property name="specification" value="1.1" />
<spring:property name="connectionFactory" ref="testMsgMqFactoryBeanCache1" />
<spring:property name="persistentDelivery" value="false" />
<spring:property name="disableTemporaryReplyToDestinations" value="true" />
<spring:property name="numberOfConsumers" value="${test.threading.profile.maxThreadsActive}" />
<spring:property name="maxRedelivery" value="-1" />
<receiver-threading-profile maxThreadsActive="${test.threading.profile.maxThreadsActive}" maxBufferSize="${test.threading.profile.maxBufferSize}" maxThreadsIdle="${test.threading.profile.maxThreadsIdle}"/>
<reconnect frequency="${mq.jms.reconnection.frequency}" count="${mq.jms.reconnection.count}" blocking="false" />
</jms:custom-connector>
<!-- msgworks inbound and outbound MQ setup -->
<!-- Rewards -->
<jms:endpoint exchange-pattern="request-response" queue="${test.msg.mq.inbound.account.queue}" name="testQueue1" connector-ref="testMsgMqConnector.1" doc:name="JMS" />
</mule>
</spring:beans>
Эта конфигурация работает нормально, когда клиент использует статическую очередь ответов. Однако у нас есть некоторые клиенты, которые используют динамическую / временную очередь ответов. Поскольку org.springframework.jms.connection.CachingConnectionFactory
производители кэшируются, для каждой временной очереди ответа объект производителя кэшируется и никогда не закрывается. После обработки сотен запросов приложение начало выдавать исключения:
********************************************************************************
Message : Failed to create and dispatch response event over Jms destination "queue://QMGR1/TESTret5a975v53AF980F2006BE02?targetClient=1". Failed to route event via endpoint: null. Message payload is of type: JMSTextMessage
Code : MULE_ERROR-42999
--------------------------------------------------------------------------------
Exception stack is:
1. MQJE001: Completion Code 2, Reason 2017 (com.ibm.mq.MQException)
com.ibm.mq.MQQueueManager:2808 (null)
2. MQJMS2008: failed to open MQ queue TESTret5a975v53AF980F2006BE02(JMS Code: MQJMS2008) (javax.jms.ResourceAllocationException)
com.ibm.mq.jms.MQQueueServices:398 (http://java.sun.com/j2ee/sdk_1.3/techdocs/api/javax/jms/ResourceAllocationException.html)
3. Failed to create and dispatch response event over Jms destination "queue://QMGR1/TESTret5a975v53AF980F2006BE02?targetClient=1". Failed to route event via endpoint: null. Message payload is of type: JMSTextMessage (org.mule.api.transport.DispatchException)
org.mule.transport.jms.JmsReplyToHandler:173 (http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/api/transport/DispatchException.html)
После изучения кода ошибки MQ (MQJE001: код завершения 2, причина 2017) я обнаружил, что причина этой ошибки в том, что мы никогда не закрывали производителей, а производители исчерпали дескрипторы MQ в диспетчере очередей. Быстрое и простое решение заключается в раскомментировании строки в конфигурации spring jms cache, чтобы закрывать производителей каждый раз.
<spring:bean id="testMsgMqFactoryBeanCache1" class="org.springframework.jms.connection.CachingConnectionFactory">
<spring:property name="targetConnectionFactory" ref="testMsgMqFactoryBean1" />
<spring:property name="sessionCacheSize" value="${test.threading.profile.maxThreadsActive}" />
<spring:property name="cacheConsumers" value="false" />
<spring:property name="cacheProducers" value="false" />
</spring:bean>
Теперь я не вижу проблемы с MQ, но столкнулся с другой проблемой производительности, потому что производители не кэшируются, поэтому каждый раз создается новый производитель.
Мой вопрос в том, как справиться с этим сценарием? Поскольку клиент не будет изменять способ получения ответных сообщений из временных очередей, как мы можем избежать исчерпания обработчиков MQ, не влияя на производительность.
Большое вам спасибо — Lei
Комментарии:
1. Это новое приложение заменит устаревшее приложение, которое использует компоненты, управляемые сообщениями. Устаревшее приложение обрабатывает запросы тех же клиентов с динамическими очередями ответов, и у него нет проблем с производительностью. Устаревшее приложение также не кэширует отправителей (производителей). Есть идеи?
2. попробуйте { QueueSession session = mqSession.getSession(); replyToQueue = createReplyToQueue(сессия); отправитель = session.createSender(replyToQueue); Текстовое сообщение message = session.createTextMessage(вызов.GetResponse()); message.setJMSCorrelationID(getJmsRequest(). getJMSMessageID()); getLogger().logDebug(имя_меТода, «Отправляющий ответ на [» replyToQueue «]»); sender.send(сообщение); getLogger().logDebug(имя_меТода, «Отправляющий ответ на [» replyToQueue «]»); setJmsResponse(сообщение); return; } наконец { JmsUtil.closeJmsResource(отправитель); }
3. Вы знаете, почему производитель никогда не закрывается? Вы создаете это как часть своего собственного кода или mule создает его? Если это так, возможно, стоит сообщить об этом в службу отслеживания проблем.
4. Стефан, это не Mule не закрывает производителей, это потому, что мы использовали spring jms CachingConnectionFactory. Он кэширует сеансовые и прокси-интерфейсы jms API для целей кэширования (потребители, производители).
5. Я в замешательстве. В какой-то момент вам нужно закрыть сеанс, и это все исправит. Тот факт, что
CachingConnectionFactory
они кэшируются как не имеющие никакого отношения к проблеме. Единственное, что это делает, — это убедиться, что вам возвращен кэшированный производитель для вашей текущей транзакции, но вы (или Mule) несете ответственность за закрытие сеанса! (проверьтеphysicalClose()
в этом классе)
Ответ №1:
Это очень интересный вариант использования. Однако, я боюсь, что нет ничего готового для исправления этого. Есть более очевидные решения: отключить кэширование или расширить spring cache provider.
Временные очереди и производительность — это определенно не две вещи, которые вы можете иметь одновременно. Я бы предложил другую возможность:
Если вы используете временные очереди, чтобы ответы возвращались только данному потребителю, возможно, в дополнение к удалению старых сообщений при повторном подключении:
Вы могли бы использовать хорошо известную очередь для ответов, используя комбинацию заголовка с именем хоста, который должен получать сообщение, плюс селектор, различный для каждого узла-потребителя, и TTL для отправляемых сообщений, чтобы они исчезали через некоторое время.