#java #spring #spring-integration
Вопрос:
Я изо всех сил пытаюсь обработать сообщения, которые находятся в канале очереди. Активатор службы запускается не сразу. Мне нужно как можно скорее сработать, когда сообщение придет на канал демоинпутешествия.
У меня есть канал, который является каналом очереди.
<int:channel id="demoInputChannel">
<int:queue/>
</int:channel>
У меня есть активатор службы для обработки сообщений, которые находятся в демоинпутчанале. Это такое определение, как это:
<int:service-activator input-channel="demoInputChannel"
output-channel="demoOutputChannel"
ref="demoService"
method="demoMethod"
requires-reply="true">
У меня также есть опросник, который определен глобально в проекте:
<int:poller default="true" fixed-delay="0" max-messages-per-poll="1"/>
Как можно немедленно запустить активатор службы?
ПРАВКА: Я изменил канал вывода на такой, чтобы убедиться, что он использует специальный опросник.
@Bean
QueueChannel demoInputChannel() {
return new QueueChannel(10);
}
@ServiceActivator(inputChannel = "demoInputChannel", outputChannel = "demoOutputChannel")
public Message<List<DataElement>> handle(Message<Long> in) throws Exception {
System.out.println("item.getPayload " in.getPayload());
return autowiredObject.autowiredObjectMethod(in);
}
ПРАВКА 2:
Вот мой метод разделения:
@Splitter(inputChannel = "splitterInputChannel", outputChannel = "demoInputChannel")
public List<Message<Long>> splitterHandle(Message<?> message) {
List<Message<Long>> myList = new ArrayList<>();
List<Long> idList = (List<Long>) message.getPayload();
for (Long l : idList) {
myList.add(org.springframework.integration.support.MessageBuilder.withPayload(l).build());
}
return myList;
}
Ответ №1:
Не делай max-messages-per-poll="1"
этого . Используйте -1
, чтобы всегда обрабатывать сообщения из очереди немедленно, когда они там находятся. Если сообщений нет, задача опроса перейдет в спящий режим на этот fixed-delay
тайм-аут. Но поскольку все распределено между разными потоками, действительно будет некоторая задержка.
Смотрите дополнительную информацию в документах: https://docs.spring.io/spring-integration/docs/current/reference/html/core.html#channel-adapter-namespace-inbound. Этот mportant: Poller Configuration
абзац.
Обновить
Вот модульный тест, демонстрирующий, что все работает так, как ожидалось:
@SpringJUnitConfig
public class So69573293Tests {
@Autowired
MessageChannel inputChannel;
@Test
void testConsumptionFromQueue() throws InterruptedException {
String payload = IntStream.range(0, 10).mapToObj(Integer::toString).collect(Collectors.joining(","));
this.inputChannel.send(new GenericMessage<>(payload));
Thread.sleep(1000);
}
@Configuration
@EnableIntegration
public static class Config {
@Splitter(inputChannel = "inputChannel", outputChannel = "demoInputChannel")
List<Long> split(String payload) {
return Arrays.stream(payload.split(","))
.map(Long::parseLong)
.collect(Collectors.toList());
}
@Bean
QueueChannel demoInputChannel() {
return new QueueChannel(10);
}
@ServiceActivator(inputChannel = "demoInputChannel", poller = @Poller(maxMessagesPerPoll = "-1", fixedRate = "1000"))
public void handle(List<Long> in) throws InterruptedException {
System.out.println("payload " in);
in.forEach(item -> System.out.println("item " item));
}
}
}
Результат выглядит следующим образом:
payload [0]
item 0
payload [1]
item 1
payload [2]
item 2
payload [3]
item 3
payload [4]
item 4
payload [5]
item 5
payload [6]
item 6
payload [7]
item 7
payload [8]
item 8
payload [9]
item 9
ОБНОВЛЕНИЕ 2
@SpringJUnitConfig
public class So69573293Tests {
@Autowired
MessageChannel inputChannel;
@Test
void testConsumptionFromQueue() throws InterruptedException {
List<Long> payload = LongStream.range(0, 10).boxed().collect(Collectors.toList());
this.inputChannel.send(new GenericMessage<>(payload));
Thread.sleep(1000);
}
@Configuration
@EnableIntegration
public static class Config {
@Splitter(inputChannel = "inputChannel", outputChannel = "demoInputChannel")
public List<Message<Long>> splitterHandle(Message<List<Long>> message) {
List<Message<Long>> myList = new ArrayList<>();
List<Long> idList = message.getPayload();
for (Long l : idList) {
myList.add(MessageBuilder.withPayload(l).build());
}
return myList;
}
@Bean
QueueChannel demoInputChannel() {
return new QueueChannel(10);
}
@ServiceActivator(inputChannel = "demoInputChannel", poller = @Poller(maxMessagesPerPoll = "-1", fixedRate = "1000"))
public void handle(Message<Long> in) throws InterruptedException {
System.out.println("payload " in);
}
}
}
Выход:
payload GenericMessage [payload=0, headers={sequenceNumber=1, correlationId=63f1a3ee-b84a-c7bc-b50e-71a96b8342db, id=d4715f90-c320-5555-78c2-cba815cdb2d2, sequenceSize=10, timestamp=1634243521339}]
payload GenericMessage [payload=1, headers={sequenceNumber=2, correlationId=63f1a3ee-b84a-c7bc-b50e-71a96b8342db, id=31e7f76f-b9e9-f53a-e5d4-82d620d6657f, sequenceSize=10, timestamp=1634243521339}]
payload GenericMessage [payload=2, headers={sequenceNumber=3, correlationId=63f1a3ee-b84a-c7bc-b50e-71a96b8342db, id=9083ae5d-60d6-7800-9a58-75eb3de17e56, sequenceSize=10, timestamp=1634243521340}]
payload GenericMessage [payload=3, headers={sequenceNumber=4, correlationId=63f1a3ee-b84a-c7bc-b50e-71a96b8342db, id=ec797609-9187-08ae-977b-7e11d7e451fc, sequenceSize=10, timestamp=1634243521340}]
payload GenericMessage [payload=4, headers={sequenceNumber=5, correlationId=63f1a3ee-b84a-c7bc-b50e-71a96b8342db, id=3d301b78-e6c1-9067-a644-7c951d0e484c, sequenceSize=10, timestamp=1634243521340}]
payload GenericMessage [payload=5, headers={sequenceNumber=6, correlationId=63f1a3ee-b84a-c7bc-b50e-71a96b8342db, id=1ec9223f-9060-0c8f-9b6f-1a3ef5a1d495, sequenceSize=10, timestamp=1634243521340}]
payload GenericMessage [payload=6, headers={sequenceNumber=7, correlationId=63f1a3ee-b84a-c7bc-b50e-71a96b8342db, id=b0231d74-6721-3f41-dc49-b8ce3179229e, sequenceSize=10, timestamp=1634243521340}]
payload GenericMessage [payload=7, headers={sequenceNumber=8, correlationId=63f1a3ee-b84a-c7bc-b50e-71a96b8342db, id=bd7508d9-429e-37f0-2353-3148008ca0a2, sequenceSize=10, timestamp=1634243521340}]
payload GenericMessage [payload=8, headers={sequenceNumber=9, correlationId=63f1a3ee-b84a-c7bc-b50e-71a96b8342db, id=1fe4625e-8807-e7c7-82a6-8867a6400790, sequenceSize=10, timestamp=1634243521340}]
payload GenericMessage [payload=9, headers={sequenceNumber=10, correlationId=63f1a3ee-b84a-c7bc-b50e-71a96b8342db, id=63bd12fb-0774-e187-a76f-f3936f71d7d4, sequenceSize=10, timestamp=1634243521340}]
Комментарии:
1. Спасибо Вам за ответ. Я не знаю почему, но это мне не помогает. Все еще не срабатывает. Если я удалю атрибут очереди из канала, он будет работать, но одновременно отправит 50 сообщений на канал demoutputchannel, а это не то, что я хочу. Мне нужно отправлять сообщения одно за другим с канала вывода на канал вывода. Я хотел использовать очередь для этого, но теперь на канал demoutputchannel не поступает никаких сообщений.
2. Ну, тогда происходит что-то еще. То, что вы объясняете, неверно для активатора службы. Он вызывается для каждого отдельного сообщения, извлеченного из очереди, и отвечает одним сообщением. Если только вы не сделаете какую-то пользовательскую логику. Даже если это так
max-messages-per-poll="-1"
, активатор службы по-прежнему вызывается только для одного сообщения за раз. Оно не приходит для следующего сообщения, пока не будет обработано текущее. Однако это происходит в том же потоке, поэтому вы можете наблюдать некоторые мертвые блокировки, так как у вас естьfixed-delay="0"
. Некоторые другие процессы также могут использовать тот же опросник.3. Я отредактировал свой вопрос. Я изменил канал вывода на Java-код и дал ему специальный опросник. Но все равно то же самое. Кстати, позвольте мне добавить еще кое-что. Я использую разделитель, и этот разделитель передает сообщения в канал очереди. Может ли это вызвать проблему?
4. В вашем списке этого метода всегда будет только одно сообщение. Средство опроса не собирает сообщения из очереди в пакеты.
5. Ну, моя беда: он даже не может привести полезную нагрузку к этому
Message<Long>
, так как список может быть только типа полезной нагрузки.