Блокирующий опросник Spring Integration, асинхронный нисходящий поток, сигнал завершения потока

#spring-integration

#spring-интеграция

Вопрос:

Я пытаюсь настроить опросник, который запрашивает компонент для получения списка в канале каждые X секунд. Этот канал имеет поток вниз по потоку, который разделяет список и выводит на канал pub / sub (дальнейший асинхронный поток) Как я могу убедиться, что в любой момент времени выполняется только один раз, и опросник должен ждать / блокировать до завершения потока, пока он не будет готов к следующему опросу (фиксированная скорость / задержка)?

 <int:channel id="configListChannel" />
<task:executor id="pollExecutor" pool-size="1" queue-capacity="1" rejection-policy="ABORT" />
<int:inbound-channel-adapter expression="configMap().values()" auto-startup="true" channel="configListChannel">
    <int:poller fixed-delay="30" time-unit="SECONDS" task-executor="pollExecutor"/>
</int:inbound-channel-adapter>

<task:executor id="configExecutor" pool-size="5"/>
<int:channel id="configChannel" >
    <int:dispatcher task-executor="configExecutor"/>
</int:channel>
<int:chain input-channel="configListChannel" output-channel="configChannel" id="configChain">
    <int:splitter/>
    <int:filter expression="payload.enablePolling"/>
</int:chain>
  

… дальнейший асинхронный поток включен configChannel для отправки исходящих сообщений

Есть ли примеры блокирующего опросника с отключением асинхронной передачи и использованием барьера для завершения потока сигналов в поток опроса? Также только один опрос за раз.

Ответ №1:

Я бы посоветовал вам реализовать ReceiveMessageAdvice (начиная с 5.3 или AbstractMessageSourceAdvice иначе). Он afterReceive() должен просто возвращать сообщение как есть, но beforeReceive() должен проверять некоторое состояние и возвращать false , если вы не можете выполнить опрос в данный момент.

Вероятно, вам не нужен барьер для этой задачи, но простой AtomicBoolean компонент для проверки состояния в этом beforeReceive() to false и возврата к true , когда вы завершите свою задачу ниже по потоку.

Комментарии:

1. В beforReceive() как я могу проверить состояние, которое в моем случае является завершением потока (асинхронная отправка sqs в несколько очередей), как отслеживать все подтверждения?.

2. Вероятно, вам следует собрать эти дополнения в агрегаторе, а затем изменить AtomicBoolean на true .

3. Я так и думал. Кроме того, поскольку у меня нет разделителя, мне придется использовать пользовательскую стратегию корреляции групп сообщений. Я отправляю сообщения в цикле, используя шлюз. и затем поток шлюза отправляет его в канал pub / sub с 2 конечными точками SQS для 2 разных очередей. Любое другое решение? Все это я могу сделать, используя код Future и Java, но я хотел бы использовать потоки, поскольку это дает мне такое поведение из коробки.

4. Конечно! вы можете ввести любую пользовательскую стратегию корреляции, чтобы объединить эти сообщения в группу.

5. В итоге получен совет по получению, как и предлагалось, см. РЕДАКТИРОВАНИЕ 2