Весенняя интеграция одновременно использует канал опроса

#spring-integration #spring-integration-dsl

Вопрос:

В моем проекте у меня есть следующие List AccountWithFiles объекты.

 @AllArgsConstructor
  @Getter
  class AccountWithFiles {

    private String account;
    private List<S3FileInfo> s3FileInfoList;
  }
 

Я хочу обработать каждую AccountWithFiles отдельно в новом потоке. Затем разделите s3FileInfoList split() их и обработайте один за другим с задержкой в 20 минут, однако параллельно account с каждым s3FileInfoList .

Итак, у меня есть следующее определение DSL:

   @Bean
  public IntegrationFlow s3DownloadFlowEnhanced() {
    return IntegrationFlows.fromSupplier(s3FileInfoRepository::findAllGroupByAccount,
        c -> c.poller(Pollers.cron(time, TimeZone.getTimeZone(zone))).id("s3TEMPO"))
        .channel("manualS3EnhancedFlow")
        .split()
        .channel("myChannel")
        .get();
  }
 

s3FileInfoRepository::findAllGroupByAccount возвращает список AccountWithFiles объектов, после чего я разделяю их и отправляю в MessageChannels Executors канал (с определенным количеством потоков).

   @Bean
  public MessageChannel myChannel() {
    return MessageChannels.publishSubscribe(Executors.newFixedThreadPool(10)).get();
  }
 

После этого

  @Bean
  public IntegrationFlow processEachAccountSeparately() {
    return IntegrationFlows.from("myChannel")
        .<AccountWithFiles, Object>transform(m -> m.getS3FileInfoList().stream().sorted(
              Comparator.comparing(i -> i.getOperationType() == FILE_OPERATION_TYPE.ADD))
              .collect(Collectors.toList()))
        .log()
        //.resequence()
        .split()
        .channel("bookItemsChannel")
        .get();
  }
  
  @Bean
  public PollableChannel bookItemsChannel(){
    return new QueueChannel();
  }
  
  @Bean
  public IntegrationFlow test() {
    return IntegrationFlows.from("bookItemsChannel")
        .delay("delayer.messageGroupId", d -> d
            .defaultDelay(25000L)
            .delayExpression("headers['delay']"))
        .log()
        .get();
  }


  @Bean(name = PollerMetadata.DEFAULT_POLLER)
  public PollerMetadata defaultPoller() {
    PollerMetadata pollerMetadata = new PollerMetadata();
    ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
    taskExecutor.setCorePoolSize(5);
    taskExecutor.initialize();

    pollerMetadata.setTaskExecutor(taskExecutor);
    pollerMetadata.setTrigger(new PeriodicTrigger(15000L));
    pollerMetadata.setMaxMessagesPerPoll(3);
    return pollerMetadata;
  }
 

Когда сообщения принимаются по Опрашиваемому каналу, они обрабатываются одно за другим с задержкой. Я хочу, чтобы мои сообщения обрабатывались одно за другим, однако параллельно на основе разветвителя из s3DownloadFlowEnhanced потока.
Я знаю, что опрашиваемые каналы различают отправителя и получателя сообщения в другом потоке. Может быть, здесь есть какой-нибудь обходной путь?

В processEachAccountSeparately потоке я вижу, что у каждой учетной записи есть свой собственный поток.

     2021-06-24 15:33:34.585  INFO 56174 --- [pool-4-thread-1] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=[S3FileInfo(fileName=sdfsdf, timeStamp=null, serviceName=null, accountLogin=login2, operationType=ADD)], headers={sequenceNumber=1, correlationId=36b132ac-7c5b-af66-96c0-2334a757c960, id=3bafdaa7-ed4c-087f-5f2c-cc114eae42cd, sequenceSize=2, timestamp=1624538014577}]
    2021-06-24 15:33:34.585  INFO 56174 --- [pool-4-thread-2] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=[S3FileInfo(fileName=sdfjsjfj, timeStamp=null, serviceName=null, accountLogin=login1, operationType=DELETE), S3FileInfo(fileName=s3/outgoing/file2, timeStamp=null, serviceName=IPASS, accountLogin=login1, operationType=DELETE), S3FileInfo(fileName=outgoing/s3/ipass.xlsx, timeStamp=null, serviceName=IPASS, accountLogin=login1, operationType=ADD), S3FileInfo(fileName=dsfsdf, timeStamp=null, serviceName=null, accountLogin=login1, operationType=ADD)], headers={sequenceNumber=2, correlationId=36b132ac-7c5b-af66-96c0-2334a757c960, id=d8506721-2cfd-b6da-d353-4fb8bd5744fb, sequenceSize=2, timestamp=1624538014577}]

However, PollableChannel executes it one by one

2021-06-24 15:33:46.328  INFO 56174 --- [lTaskExecutor-3] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=S3FileInfo(fileName=sdfjsjfj, timeStamp=null, serviceName=null, accountLogin=login1, operationType=DELETE), headers={sequenceNumber=1, sequenceDetails=[[36b132ac-7c5b-af66-96c0-2334a757c960, 2, 2]], correlationId=d8506721-2cfd-b6da-d353-4fb8bd5744fb, id=7f8bd9a6-25ce-0bb2-c3f3-581d823d8fce, sequenceSize=4, timestamp=1624538014585}]
2021-06-24 15:33:46.329  INFO 56174 --- [lTaskExecutor-3] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=S3FileInfo(fileName=sdfsdf, timeStamp=null, serviceName=null, accountLogin=login2, operationType=ADD), headers={sequenceNumber=1, sequenceDetails=[[36b132ac-7c5b-af66-96c0-2334a757c960, 1, 2]], correlationId=3bafdaa7-ed4c-087f-5f2c-cc114eae42cd, id=f697b52b-1053-51aa-232f-88bb602dc1c9, sequenceSize=1, timestamp=1624538014585}]
2021-06-24 15:33:46.329  INFO 56174 --- [lTaskExecutor-3] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=S3FileInfo(fileName=s3/outgoing/file2, timeStamp=null, serviceName=IPASS, accountLogin=login1, operationType=DELETE), headers={sequenceNumber=2, sequenceDetails=[[36b132ac-7c5b-af66-96c0-2334a757c960, 2, 2]], correlationId=d8506721-2cfd-b6da-d353-4fb8bd5744fb, id=a6754c98-fce0-f132-664a-65d61f553ae2, sequenceSize=4, timestamp=1624538014585}]
2021-06-24 15:34:01.333  INFO 56174 --- [lTaskExecutor-4] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=S3FileInfo(fileName=outgoing/s3/ipass.xlsx, timeStamp=null, serviceName=IPASS, accountLogin=login1, operationType=ADD), headers={sequenceNumber=3, sequenceDetails=[[36b132ac-7c5b-af66-96c0-2334a757c960, 2, 2]], correlationId=d8506721-2cfd-b6da-d353-4fb8bd5744fb, id=71fa915a-fcaa-3d00-023b-5cf51be3b183, sequenceSize=4, timestamp=1624538014585}]
2021-06-24 15:34:01.333  INFO 56174 --- [lTaskExecutor-4] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=S3FileInfo(fileName=dsfsdf, timeStamp=null, serviceName=null, accountLogin=login1, operationType=ADD), headers={sequenceNumber=4, sequenceDetails=[[36b132ac-7c5b-af66-96c0-2334a757c960, 2, 2]], correlationId=d8506721-2cfd-b6da-d353-4fb8bd5744fb, id=7c513e23-5484-4f61-b7d3-362648c7b89c, sequenceSize=4, timestamp=1624538014585}]
 

Чего я хочу, так это иметь что-то вроде этого:

 [pool-4-thread-1] simultaneously 
[pool-4-thread-2] simultaneously
[pool-4-thread-2] 20 min delay
[pool-4-thread-2] 20 min delay
[pool-4-thread-2] 20 min delay
 

Ответ №1:

Ваш первый шаг заключается в правильном разбиении списка AccountWithFiles . Затем вы говорите, что хотели бы разделить s3FileInfoList и обработать их последовательно, один за другим, но почему вы помещаете их в a QeueuChannel ? В DirectChannel этом случае достаточно обычного значения по умолчанию.

Однако затем вы переходите к delay() тому, что не блокирует текущий поток, а планирует задачу в будущем в отдельном потоке. Таким образом, вам, вероятно, необходимо пересмотреть свое решение, поскольку при нынешнем подходе, даже если вы избавитесь от этого bookItemsChannel в виде очереди, вы все равно не будете последовательно обрабатывать отложенные сообщения. Просто нет никакой гарантии TaskScheduler , какой из них будет выполнен первым, когда у всех будет одинаковое время по расписанию.

Комментарии:

1. Спасибо за быстрый ответ. Возможно ли сделать что-то подобное account1-> S3FileInfo (delay) -> S3FileInfo (delay). account2-> S3FileInfo (delay) -> S3FileInfo (delay) ? Таким образом, учетные записи 1 и 2 обрабатываются параллельно, но с задержкой. Честно говоря, я забыл избавиться delay() Poller от того, что вместо этого использовал.

2. Невозможно добиться точной задержки с опрашиваемым каналом, так как ваше сообщение может появиться в очереди в середине текущего цикла опроса. Кроме того, как вы заметили, все сообщения опрашиваются одним и тем же потоком на стороне потребителя в опрашиваемом канале. В любом случае: невозможно выполнить обработку в одном потоке после задержки. Вам, вероятно, может потребоваться подумать о том, чтобы отложить весь AccountWithFiles объект, а затем getS3FileInfoList() последовательно разделить его.