#spring-integration
#spring-интеграция
Вопрос:
Я пытаюсь настроить опросник, который запрашивает компонент для получения списка в канале каждые X секунд. Этот канал имеет поток вниз по потоку, который разделяет список и выводит на канал pub / sub (дальнейший асинхронный поток) Как я могу убедиться, что в любой момент времени выполняется только один раз, и опросник должен ждать / блокировать до завершения потока, пока он не будет готов к следующему опросу (фиксированная скорость / задержка)?
<int:channel id="configListChannel" />
<task:executor id="pollExecutor" pool-size="1" queue-capacity="1" rejection-policy="ABORT" />
<int:inbound-channel-adapter expression="configMap().values()" auto-startup="true" channel="configListChannel">
<int:poller fixed-delay="30" time-unit="SECONDS" task-executor="pollExecutor"/>
</int:inbound-channel-adapter>
<task:executor id="configExecutor" pool-size="5"/>
<int:channel id="configChannel" >
<int:dispatcher task-executor="configExecutor"/>
</int:channel>
<int:chain input-channel="configListChannel" output-channel="configChannel" id="configChain">
<int:splitter/>
<int:filter expression="payload.enablePolling"/>
</int:chain>
… дальнейший асинхронный поток включен configChannel
для отправки исходящих сообщений
Есть ли примеры блокирующего опросника с отключением асинхронной передачи и использованием барьера для завершения потока сигналов в поток опроса? Также только один опрос за раз.
Ответ №1:
Я бы посоветовал вам реализовать ReceiveMessageAdvice
(начиная с 5.3 или AbstractMessageSourceAdvice
иначе). Он afterReceive()
должен просто возвращать сообщение как есть, но beforeReceive()
должен проверять некоторое состояние и возвращать false
, если вы не можете выполнить опрос в данный момент.
Вероятно, вам не нужен барьер для этой задачи, но простой AtomicBoolean
компонент для проверки состояния в этом beforeReceive()
to false
и возврата к true
, когда вы завершите свою задачу ниже по потоку.
Комментарии:
1. В
beforReceive()
как я могу проверить состояние, которое в моем случае является завершением потока (асинхронная отправка sqs в несколько очередей), как отслеживать все подтверждения?.2. Вероятно, вам следует собрать эти дополнения в агрегаторе, а затем изменить
AtomicBoolean
наtrue
.3. Я так и думал. Кроме того, поскольку у меня нет разделителя, мне придется использовать пользовательскую стратегию корреляции групп сообщений. Я отправляю сообщения в цикле, используя шлюз. и затем поток шлюза отправляет его в канал pub / sub с 2 конечными точками SQS для 2 разных очередей. Любое другое решение? Все это я могу сделать, используя код Future и Java, но я хотел бы использовать потоки, поскольку это дает мне такое поведение из коробки.
4. Конечно! вы можете ввести любую пользовательскую стратегию корреляции, чтобы объединить эти сообщения в группу.
5. В итоге получен совет по получению, как и предлагалось, см. РЕДАКТИРОВАНИЕ 2