#spring-integration #spring-integration-dsl
Вопрос:
В моем проекте у меня есть следующие List
AccountWithFiles
объекты.
@AllArgsConstructor
@Getter
class AccountWithFiles {
private String account;
private List<S3FileInfo> s3FileInfoList;
}
Я хочу обработать каждую AccountWithFiles
отдельно в новом потоке. Затем разделите s3FileInfoList
split()
их и обработайте один за другим с задержкой в 20 минут, однако параллельно account
с каждым s3FileInfoList
.
Итак, у меня есть следующее определение DSL:
@Bean
public IntegrationFlow s3DownloadFlowEnhanced() {
return IntegrationFlows.fromSupplier(s3FileInfoRepository::findAllGroupByAccount,
c -> c.poller(Pollers.cron(time, TimeZone.getTimeZone(zone))).id("s3TEMPO"))
.channel("manualS3EnhancedFlow")
.split()
.channel("myChannel")
.get();
}
s3FileInfoRepository::findAllGroupByAccount
возвращает список AccountWithFiles
объектов, после чего я разделяю их и отправляю в MessageChannels
Executors
канал (с определенным количеством потоков).
@Bean
public MessageChannel myChannel() {
return MessageChannels.publishSubscribe(Executors.newFixedThreadPool(10)).get();
}
После этого
@Bean
public IntegrationFlow processEachAccountSeparately() {
return IntegrationFlows.from("myChannel")
.<AccountWithFiles, Object>transform(m -> m.getS3FileInfoList().stream().sorted(
Comparator.comparing(i -> i.getOperationType() == FILE_OPERATION_TYPE.ADD))
.collect(Collectors.toList()))
.log()
//.resequence()
.split()
.channel("bookItemsChannel")
.get();
}
@Bean
public PollableChannel bookItemsChannel(){
return new QueueChannel();
}
@Bean
public IntegrationFlow test() {
return IntegrationFlows.from("bookItemsChannel")
.delay("delayer.messageGroupId", d -> d
.defaultDelay(25000L)
.delayExpression("headers['delay']"))
.log()
.get();
}
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
PollerMetadata pollerMetadata = new PollerMetadata();
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(5);
taskExecutor.initialize();
pollerMetadata.setTaskExecutor(taskExecutor);
pollerMetadata.setTrigger(new PeriodicTrigger(15000L));
pollerMetadata.setMaxMessagesPerPoll(3);
return pollerMetadata;
}
Когда сообщения принимаются по Опрашиваемому каналу, они обрабатываются одно за другим с задержкой. Я хочу, чтобы мои сообщения обрабатывались одно за другим, однако параллельно на основе разветвителя из s3DownloadFlowEnhanced
потока.
Я знаю, что опрашиваемые каналы различают отправителя и получателя сообщения в другом потоке. Может быть, здесь есть какой-нибудь обходной путь?
В processEachAccountSeparately
потоке я вижу, что у каждой учетной записи есть свой собственный поток.
2021-06-24 15:33:34.585 INFO 56174 --- [pool-4-thread-1] o.s.integration.handler.LoggingHandler : GenericMessage [payload=[S3FileInfo(fileName=sdfsdf, timeStamp=null, serviceName=null, accountLogin=login2, operationType=ADD)], headers={sequenceNumber=1, correlationId=36b132ac-7c5b-af66-96c0-2334a757c960, id=3bafdaa7-ed4c-087f-5f2c-cc114eae42cd, sequenceSize=2, timestamp=1624538014577}]
2021-06-24 15:33:34.585 INFO 56174 --- [pool-4-thread-2] o.s.integration.handler.LoggingHandler : GenericMessage [payload=[S3FileInfo(fileName=sdfjsjfj, timeStamp=null, serviceName=null, accountLogin=login1, operationType=DELETE), S3FileInfo(fileName=s3/outgoing/file2, timeStamp=null, serviceName=IPASS, accountLogin=login1, operationType=DELETE), S3FileInfo(fileName=outgoing/s3/ipass.xlsx, timeStamp=null, serviceName=IPASS, accountLogin=login1, operationType=ADD), S3FileInfo(fileName=dsfsdf, timeStamp=null, serviceName=null, accountLogin=login1, operationType=ADD)], headers={sequenceNumber=2, correlationId=36b132ac-7c5b-af66-96c0-2334a757c960, id=d8506721-2cfd-b6da-d353-4fb8bd5744fb, sequenceSize=2, timestamp=1624538014577}]
However, PollableChannel executes it one by one
2021-06-24 15:33:46.328 INFO 56174 --- [lTaskExecutor-3] o.s.integration.handler.LoggingHandler : GenericMessage [payload=S3FileInfo(fileName=sdfjsjfj, timeStamp=null, serviceName=null, accountLogin=login1, operationType=DELETE), headers={sequenceNumber=1, sequenceDetails=[[36b132ac-7c5b-af66-96c0-2334a757c960, 2, 2]], correlationId=d8506721-2cfd-b6da-d353-4fb8bd5744fb, id=7f8bd9a6-25ce-0bb2-c3f3-581d823d8fce, sequenceSize=4, timestamp=1624538014585}]
2021-06-24 15:33:46.329 INFO 56174 --- [lTaskExecutor-3] o.s.integration.handler.LoggingHandler : GenericMessage [payload=S3FileInfo(fileName=sdfsdf, timeStamp=null, serviceName=null, accountLogin=login2, operationType=ADD), headers={sequenceNumber=1, sequenceDetails=[[36b132ac-7c5b-af66-96c0-2334a757c960, 1, 2]], correlationId=3bafdaa7-ed4c-087f-5f2c-cc114eae42cd, id=f697b52b-1053-51aa-232f-88bb602dc1c9, sequenceSize=1, timestamp=1624538014585}]
2021-06-24 15:33:46.329 INFO 56174 --- [lTaskExecutor-3] o.s.integration.handler.LoggingHandler : GenericMessage [payload=S3FileInfo(fileName=s3/outgoing/file2, timeStamp=null, serviceName=IPASS, accountLogin=login1, operationType=DELETE), headers={sequenceNumber=2, sequenceDetails=[[36b132ac-7c5b-af66-96c0-2334a757c960, 2, 2]], correlationId=d8506721-2cfd-b6da-d353-4fb8bd5744fb, id=a6754c98-fce0-f132-664a-65d61f553ae2, sequenceSize=4, timestamp=1624538014585}]
2021-06-24 15:34:01.333 INFO 56174 --- [lTaskExecutor-4] o.s.integration.handler.LoggingHandler : GenericMessage [payload=S3FileInfo(fileName=outgoing/s3/ipass.xlsx, timeStamp=null, serviceName=IPASS, accountLogin=login1, operationType=ADD), headers={sequenceNumber=3, sequenceDetails=[[36b132ac-7c5b-af66-96c0-2334a757c960, 2, 2]], correlationId=d8506721-2cfd-b6da-d353-4fb8bd5744fb, id=71fa915a-fcaa-3d00-023b-5cf51be3b183, sequenceSize=4, timestamp=1624538014585}]
2021-06-24 15:34:01.333 INFO 56174 --- [lTaskExecutor-4] o.s.integration.handler.LoggingHandler : GenericMessage [payload=S3FileInfo(fileName=dsfsdf, timeStamp=null, serviceName=null, accountLogin=login1, operationType=ADD), headers={sequenceNumber=4, sequenceDetails=[[36b132ac-7c5b-af66-96c0-2334a757c960, 2, 2]], correlationId=d8506721-2cfd-b6da-d353-4fb8bd5744fb, id=7c513e23-5484-4f61-b7d3-362648c7b89c, sequenceSize=4, timestamp=1624538014585}]
Чего я хочу, так это иметь что-то вроде этого:
[pool-4-thread-1] simultaneously
[pool-4-thread-2] simultaneously
[pool-4-thread-2] 20 min delay
[pool-4-thread-2] 20 min delay
[pool-4-thread-2] 20 min delay
Ответ №1:
Ваш первый шаг заключается в правильном разбиении списка AccountWithFiles
. Затем вы говорите, что хотели бы разделить s3FileInfoList
и обработать их последовательно, один за другим, но почему вы помещаете их в a QeueuChannel
? В DirectChannel
этом случае достаточно обычного значения по умолчанию.
Однако затем вы переходите к delay()
тому, что не блокирует текущий поток, а планирует задачу в будущем в отдельном потоке. Таким образом, вам, вероятно, необходимо пересмотреть свое решение, поскольку при нынешнем подходе, даже если вы избавитесь от этого bookItemsChannel
в виде очереди, вы все равно не будете последовательно обрабатывать отложенные сообщения. Просто нет никакой гарантии TaskScheduler
, какой из них будет выполнен первым, когда у всех будет одинаковое время по расписанию.
Комментарии:
1. Спасибо за быстрый ответ. Возможно ли сделать что-то подобное
account1-> S3FileInfo (delay) -> S3FileInfo (delay). account2-> S3FileInfo (delay) -> S3FileInfo (delay)
? Таким образом, учетные записи 1 и 2 обрабатываются параллельно, но с задержкой. Честно говоря, я забыл избавитьсяdelay()
Poller
от того, что вместо этого использовал.2. Невозможно добиться точной задержки с опрашиваемым каналом, так как ваше сообщение может появиться в очереди в середине текущего цикла опроса. Кроме того, как вы заметили, все сообщения опрашиваются одним и тем же потоком на стороне потребителя в опрашиваемом канале. В любом случае: невозможно выполнить обработку в одном потоке после задержки. Вам, вероятно, может потребоваться подумать о том, чтобы отложить весь
AccountWithFiles
объект, а затемgetS3FileInfoList()
последовательно разделить его.