#mule
#mule
Вопрос:
Пытаюсь найти правильный способ рекурсивного вызова потока Мула.
У нас есть поток, который создает массив работы для выполнения во время выполнения, затем рекурсивно вызывает себя, используя «Ссылку на поток» внутри блока «Для каждого». Проблема в том, что мы не выяснили правильный способ передачи параметров в этот рекурсивный поток, поэтому мы не получаем ожидаемых результатов.
Мы пытались передавать параметры, используя свойства потока (setInvocationParameter() в Groovy), но, похоже, они являются общими для нескольких экземпляров потока. Для примера у нас есть массив ForEach, который перебирает массив, содержащий [2. 3. 4] , но в зависимости от времени некоторые из этих значений теряются (обычно мы видим 2, затем 4 дважды — пропуская 3).
Мы пробовали разные стратегии обработки Mule, но безуспешно. Асинхронный режим очереди Mule по умолчанию имеет проблемы, описанные выше. Синхронный, похоже, вообще не работает (имеет смысл, поскольку для нашей рекурсивной модели, вероятно, требуется минимум два экземпляра для запуска).
Вот соответствующая часть конфигурационного XML (весь поток довольно большой). В конце потока это:
<foreach collection="#[sessionVars['actionArray']]"
counterVariableName="actionIndex"
rootMessageVariableName="actionVar" doc:name="For Each">
<scripting:component doc:name="Run Each Action">
<scripting:script engine="Groovy">
<![CDATA[def aa = message.getSessionProperty('actionArray')
def this_item = aa.get(message.getInvocationProperty('actionIndex'))
// Pass the desired action for the recursive call
message.setInvocationProperty('FlowAction', this_item)
log.info "Running $this_item" // <- Shows the correct item
return]]>
</scripting:script>
</scripting:component>
<flow-ref name="DoAction" doc:name="Do Action"/>
</foreach>
В начале потока есть регистратор, который отображает переменную потока «FlowAction». Когда мы тестируем с моим массивом [2, 3, 4], этот оператор регистратора запускается три раза (как и ожидалось), но обычно со значениями 2, 4 и 4.
Мы получаем те же результаты в Mule 3.7 и более старой системе 3.4, которая у нас есть (обе версии для сообщества).
Спасибо за любые предложения от мастеров Мула…
Комментарии:
1. Можете ли вы опубликовать свой xml?
Ответ №1:
Я не уверен, что это на 100% правильно, но вот что мы сделали…
Потратив много времени на то, чтобы заставить подход «Для каждого» и «Ссылка на поток» работать надежно, мы сдались и переключились на другую технику. Нашей альтернативой было удалить для каждого блока и рекурсивно управлять потоком из короткого скрипта Groovy:
. . .
// Invoke the flow recursively for every item in the array
Flow flow = muleContext.getRegistry().lookupFlowConstruct("flow name")
actions.each // actions is an array of integers built earlier
{ item->
MuleMessage msg = message.createInboundMessage()
DefaultMuleSession sess = new DefaultMuleSession(flow, muleContext)
DefaultMuleEvent event = new DefaultMuleEvent(msg, MessageExchangePattern.ONE_WAY, sess)
// Copy the current inbound properties to the new message
message.getInboundPropertyNames().each
{
event.getMessage().setProperty(it, message.getInboundProperty(it), PropertyScope.INBOUND)
}
// Copy the current session variables to the new message too
message.getSessionPropertyNames().each
{
event.setSessionVariable(it, message.getSessionProperty(it))
}
// Now set the item we want processed as a flow variable
event.setFlowVariable("Action", item.toString())
// Finally, trigger the flow (which runs asynchronously)
flow.process(event).getMessage()
}
Теперь это работает должным образом в нашей среде.