Лучшая Практика Весенней Интеграции Для Получения Сообщений Из Канала Очереди

#java #spring #spring-integration

Вопрос:

Я изо всех сил пытаюсь обработать сообщения, которые находятся в канале очереди. Активатор службы запускается не сразу. Мне нужно как можно скорее сработать, когда сообщение придет на канал демоинпутешествия.

У меня есть канал, который является каналом очереди.

 <int:channel id="demoInputChannel">
    <int:queue/>
</int:channel>
 

У меня есть активатор службы для обработки сообщений, которые находятся в демоинпутчанале. Это такое определение, как это:

 <int:service-activator input-channel="demoInputChannel"
                       output-channel="demoOutputChannel"
                       ref="demoService"
                       method="demoMethod"
                       requires-reply="true">
 

У меня также есть опросник, который определен глобально в проекте:

 <int:poller default="true" fixed-delay="0" max-messages-per-poll="1"/>
 

Как можно немедленно запустить активатор службы?

ПРАВКА: Я изменил канал вывода на такой, чтобы убедиться, что он использует специальный опросник.

 @Bean
QueueChannel demoInputChannel() {
    return new QueueChannel(10);
}

@ServiceActivator(inputChannel = "demoInputChannel", outputChannel = "demoOutputChannel")
public Message<List<DataElement>> handle(Message<Long> in) throws Exception {
    System.out.println("item.getPayload "   in.getPayload());
    return autowiredObject.autowiredObjectMethod(in);
}
 

ПРАВКА 2:

Вот мой метод разделения:

 @Splitter(inputChannel = "splitterInputChannel", outputChannel = "demoInputChannel")
public List<Message<Long>> splitterHandle(Message<?> message) {
    List<Message<Long>> myList = new ArrayList<>();
    List<Long> idList = (List<Long>) message.getPayload();
    for (Long l : idList) {
        myList.add(org.springframework.integration.support.MessageBuilder.withPayload(l).build());
    }
    return myList;
}
 

Ответ №1:

Не делай max-messages-per-poll="1" этого . Используйте -1 , чтобы всегда обрабатывать сообщения из очереди немедленно, когда они там находятся. Если сообщений нет, задача опроса перейдет в спящий режим на этот fixed-delay тайм-аут. Но поскольку все распределено между разными потоками, действительно будет некоторая задержка.

Смотрите дополнительную информацию в документах: https://docs.spring.io/spring-integration/docs/current/reference/html/core.html#channel-adapter-namespace-inbound. Этот mportant: Poller Configuration абзац.

Обновить

Вот модульный тест, демонстрирующий, что все работает так, как ожидалось:

 @SpringJUnitConfig
public class So69573293Tests {

    @Autowired
    MessageChannel inputChannel;

    @Test
    void testConsumptionFromQueue() throws InterruptedException {
        String payload = IntStream.range(0, 10).mapToObj(Integer::toString).collect(Collectors.joining(","));
        this.inputChannel.send(new GenericMessage<>(payload));

        Thread.sleep(1000);
    }

    @Configuration
    @EnableIntegration
    public static class Config {

        @Splitter(inputChannel = "inputChannel", outputChannel = "demoInputChannel")
        List<Long> split(String payload) {
            return Arrays.stream(payload.split(","))
                    .map(Long::parseLong)
                    .collect(Collectors.toList());
        }


        @Bean
        QueueChannel demoInputChannel() {
            return new QueueChannel(10);
        }

        @ServiceActivator(inputChannel = "demoInputChannel", poller = @Poller(maxMessagesPerPoll = "-1", fixedRate = "1000"))
        public void handle(List<Long> in) throws InterruptedException {
            System.out.println("payload "   in);
            in.forEach(item -> System.out.println("item "   item));
        }

    }

}
 

Результат выглядит следующим образом:

 payload [0]
item 0
payload [1]
item 1
payload [2]
item 2
payload [3]
item 3
payload [4]
item 4
payload [5]
item 5
payload [6]
item 6
payload [7]
item 7
payload [8]
item 8
payload [9]
item 9
 

ОБНОВЛЕНИЕ 2

 @SpringJUnitConfig
public class So69573293Tests {

    @Autowired
    MessageChannel inputChannel;

    @Test
    void testConsumptionFromQueue() throws InterruptedException {
        List<Long> payload = LongStream.range(0, 10).boxed().collect(Collectors.toList());
        this.inputChannel.send(new GenericMessage<>(payload));

        Thread.sleep(1000);
    }

    @Configuration
    @EnableIntegration
    public static class Config {

        @Splitter(inputChannel = "inputChannel", outputChannel = "demoInputChannel")
        public List<Message<Long>> splitterHandle(Message<List<Long>> message) {
            List<Message<Long>> myList = new ArrayList<>();
            List<Long> idList = message.getPayload();
            for (Long l : idList) {
                myList.add(MessageBuilder.withPayload(l).build());
            }
            return myList;
        }


        @Bean
        QueueChannel demoInputChannel() {
            return new QueueChannel(10);
        }

        @ServiceActivator(inputChannel = "demoInputChannel", poller = @Poller(maxMessagesPerPoll = "-1", fixedRate = "1000"))
        public void handle(Message<Long> in) throws InterruptedException {
            System.out.println("payload "   in);
        }

    }

}
 

Выход:

 payload GenericMessage [payload=0, headers={sequenceNumber=1, correlationId=63f1a3ee-b84a-c7bc-b50e-71a96b8342db, id=d4715f90-c320-5555-78c2-cba815cdb2d2, sequenceSize=10, timestamp=1634243521339}]
payload GenericMessage [payload=1, headers={sequenceNumber=2, correlationId=63f1a3ee-b84a-c7bc-b50e-71a96b8342db, id=31e7f76f-b9e9-f53a-e5d4-82d620d6657f, sequenceSize=10, timestamp=1634243521339}]
payload GenericMessage [payload=2, headers={sequenceNumber=3, correlationId=63f1a3ee-b84a-c7bc-b50e-71a96b8342db, id=9083ae5d-60d6-7800-9a58-75eb3de17e56, sequenceSize=10, timestamp=1634243521340}]
payload GenericMessage [payload=3, headers={sequenceNumber=4, correlationId=63f1a3ee-b84a-c7bc-b50e-71a96b8342db, id=ec797609-9187-08ae-977b-7e11d7e451fc, sequenceSize=10, timestamp=1634243521340}]
payload GenericMessage [payload=4, headers={sequenceNumber=5, correlationId=63f1a3ee-b84a-c7bc-b50e-71a96b8342db, id=3d301b78-e6c1-9067-a644-7c951d0e484c, sequenceSize=10, timestamp=1634243521340}]
payload GenericMessage [payload=5, headers={sequenceNumber=6, correlationId=63f1a3ee-b84a-c7bc-b50e-71a96b8342db, id=1ec9223f-9060-0c8f-9b6f-1a3ef5a1d495, sequenceSize=10, timestamp=1634243521340}]
payload GenericMessage [payload=6, headers={sequenceNumber=7, correlationId=63f1a3ee-b84a-c7bc-b50e-71a96b8342db, id=b0231d74-6721-3f41-dc49-b8ce3179229e, sequenceSize=10, timestamp=1634243521340}]
payload GenericMessage [payload=7, headers={sequenceNumber=8, correlationId=63f1a3ee-b84a-c7bc-b50e-71a96b8342db, id=bd7508d9-429e-37f0-2353-3148008ca0a2, sequenceSize=10, timestamp=1634243521340}]
payload GenericMessage [payload=8, headers={sequenceNumber=9, correlationId=63f1a3ee-b84a-c7bc-b50e-71a96b8342db, id=1fe4625e-8807-e7c7-82a6-8867a6400790, sequenceSize=10, timestamp=1634243521340}]
payload GenericMessage [payload=9, headers={sequenceNumber=10, correlationId=63f1a3ee-b84a-c7bc-b50e-71a96b8342db, id=63bd12fb-0774-e187-a76f-f3936f71d7d4, sequenceSize=10, timestamp=1634243521340}]
 

Комментарии:

1. Спасибо Вам за ответ. Я не знаю почему, но это мне не помогает. Все еще не срабатывает. Если я удалю атрибут очереди из канала, он будет работать, но одновременно отправит 50 сообщений на канал demoutputchannel, а это не то, что я хочу. Мне нужно отправлять сообщения одно за другим с канала вывода на канал вывода. Я хотел использовать очередь для этого, но теперь на канал demoutputchannel не поступает никаких сообщений.

2. Ну, тогда происходит что-то еще. То, что вы объясняете, неверно для активатора службы. Он вызывается для каждого отдельного сообщения, извлеченного из очереди, и отвечает одним сообщением. Если только вы не сделаете какую-то пользовательскую логику. Даже если это так max-messages-per-poll="-1" , активатор службы по-прежнему вызывается только для одного сообщения за раз. Оно не приходит для следующего сообщения, пока не будет обработано текущее. Однако это происходит в том же потоке, поэтому вы можете наблюдать некоторые мертвые блокировки, так как у вас есть fixed-delay="0" . Некоторые другие процессы также могут использовать тот же опросник.

3. Я отредактировал свой вопрос. Я изменил канал вывода на Java-код и дал ему специальный опросник. Но все равно то же самое. Кстати, позвольте мне добавить еще кое-что. Я использую разделитель, и этот разделитель передает сообщения в канал очереди. Может ли это вызвать проблему?

4. В вашем списке этого метода всегда будет только одно сообщение. Средство опроса не собирает сообщения из очереди в пакеты.

5. Ну, моя беда: он даже не может привести полезную нагрузку к этому Message<Long> , так как список может быть только типа полезной нагрузки.