Файл выборки Spring Sftp с использованием исходящего шлюза в обработчике сообщений входящего адаптера

#java #spring-boot #spring-integration #spring-integration-dsl #spring-integration-sftp

#java #spring-boot #spring-интеграция #spring-интеграция-dsl #spring-интеграция-sftp

Вопрос:

Я использую входящий адаптер с использованием Java DSL для опроса PDF-файлов с SFTP-сервера. У меня есть вариант использования, когда после извлечения файла pdf приложение получит файл конфигурации, представленный в формате CSV с тем же именем на SFTP-сервере. После извлечения файла конфигурации приложение обработает исходный файл pdf, используя свойства, определенные в файле конфигурации, и загрузит его обратно на SFTP-сервер с использованием исходящего адаптера.

Я сталкиваюсь с проблемами при извлечении файлов конфигурации в обработчике в том же потоке с использованием исходящего шлюза.

Вот мой код :

Регистрируйте потоки интеграции:

   for (String client : clientsArr) {
      this.flowContext.registration(getInboundIntegrationFlow(client)).register();
  }

  this.flowContext.registration(getOutboundIntegrationFlow()).register();
  this.flowContext.registration(sftpGatewayGetIntegrationFlow()).register();
  

Поток интеграции входящего адаптера:

 
  @Autowired
  private SftpPdfMessageHandler messageHandler;

  private IntegrationFlow getInboundIntegrationFlow(String client) {

    String remoteDirectory = getRemoteDirectory(client);
    String localDirectory = getLocalDirectory(client);
    String inboundAdapterId = getInboundAdapterId(client);

    return IntegrationFlows
        .from(Sftp.inboundAdapter(sftpSessionFactory())
                .preserveTimestamp(true)
                .remoteDirectory(remoteDirectory)
                .autoCreateLocalDirectory(true)
                .localDirectory(new File(localDirectory))
                .maxFetchSize(Integer.parseInt(sftpProperties.getMaxFetchSize()))
                .filter(new SftpSimplePatternFileListFilter(sftpProperties.getRemoteFileFilter()))
                .deleteRemoteFiles(true),
            e -> e.id(inboundAdapterId)
                .autoStartup(true)
                .poller(Pollers
                    .fixedDelay(Long.parseLong(sftpProperties.getPollPeriodInSeconds()), TimeUnit.SECONDS)
                    .receiveTimeout(Long.parseLong(sftpProperties.getPollerTimeout()))
                    .maxMessagesPerPoll(Long.parseLong(sftpProperties.getMaxMessagesPerPoll()))
                ))
        .handle(inBoundHandler())
        .get();
  }

  public MessageHandler inBoundHandler() {
    return message -> {
      File file = (File) message.getPayload();
      messageHandler.handleMessage(file);
    };
  }
  

Поток интеграции исходящего адаптера:

   private IntegrationFlow getOutboundIntegrationFlow() {

    return IntegrationFlows.from("sftpOutboundChannel")
        .handle(Sftp.outboundAdapter(sftpSessionFactory(), FileExistsMode.FAIL)
            .remoteDirectoryExpression(String.format("headers['%s']", FileHeaders.REMOTE_DIRECTORY))).get();
  }

  @Bean("sftpOutboundChannel")
  public MessageChannel sftpOutboundChannel() {
    return new DirectChannel();
  }
  

Обработчик сообщений SFTP:

   @Async("sftpHandlerAsyncExecutor")
  public void handleMessage(File originalFile) {

    File configFile = fetchConfigFile();

    /*
      process original file and store processed file in output file path on local directory
     */
      
    boolean success = uploadFileToSftpServer(outputFilePath, client, entity);

    if (success) {
      deleteFileFromLocal(originalFile);
    }
  }
  

Исходящий шлюз ПОЛУЧАЕТ поток интеграции:

   private IntegrationFlow sftpGatewayGetIntegrationFlow() {
    return IntegrationFlows.from("sftpGetInputChannel")
        .handle(Sftp.outboundGateway(sftpSessionFactory(),
            AbstractRemoteFileOutboundGateway.Command.GET, "payload")
            .options(AbstractRemoteFileOutboundGateway.Option.DELETE,
                AbstractRemoteFileOutboundGateway.Option.PRESERVE_TIMESTAMP)
            .localDirectoryExpression(String.format("headers['%s']", Constants.HEADER_LOCAL_DIRECTORY_NAME))
            .autoCreateLocalDirectory(true))
        .channel("nullChannel")
        .get();
  }

  @Bean("sftpGetInputChannel")
  public MessageChannel sftpGetInputChannel() {
    return new DirectChannel();
  }
  

messageHandler.handleMessage() метод вызывается в асинхронном режиме (с помощью ThreadPoolTaskExecutor), который внутренне извлекает файл конфигурации с использованием исходящего шлюза. Но я не смог найти ни одного канала, по которому я мог бы отправлять и получать полезную нагрузку сообщения в одном потоке. Я нашел MessagingTemplate в spring docs, но не смог найти способ подключить это к моему потоку интеграции исходящего шлюза.

sftpGetMessageTemplate.sendAndReceive(sftpGetInputChannel, new GenericMessage<>("/dir/file.csv", headers)) выдает исключение «У диспетчера нет подписчиков на канал» с помощью DirectChannel().

Я ищу решение, при котором я могу извлечь требуемый файл с сервера любым из следующих способов :

  • Интегрируйте MessagingTemplate с IntegrationFlow (если возможно), используя соответствующий канал.
  • Некоторая цепочка обработчиков сообщений во входящем потоке адаптера, где после опроса исходного файла он извлекает другой файл с помощью sftp outbound gateway, а затем вызывает конечный обработчик с обоими объектами (исходным файлом и файлом конфигурации). Я пытаюсь добиться аналогичной вещи, используя пользовательский код выше.
  • Любой другой способ использования каналов отправки и опроса для команды GET в многопоточной среде.

Приложению необходимо определить путь к каталогу во время выполнения при использовании команды GET.

Ответ №1:

Вероятно, вам нужно узнать, что такое @MessagingGateway и как заставить его взаимодействовать с каналами на вашем исходящем шлюзе.

Смотрите документы для получения дополнительной информации: https://docs.spring.io/spring-integration/docs/5.3.2.RELEASE/reference/html/messaging-endpoints.html#gateway

Если вы действительно хотите получить в результате файл конфигурации, вам не следует этого делать .channel("nullChannel") . Когда шлюз будет в руках, появится replyChannel заголовок с TemporaryReplyChannel экземпляром, заполненным шлюзом. Затем в вашем коде вы просто собираетесь использовать этот функциональный интерфейс в качестве API для вызова.

Фактически, этот шлюз обмена сообщениями использует упомянутый MessagingTemplate.sendAndReceive() .

Комментарии:

1. Спасибо @Artem, шлюз обмена сообщениями полезен, и я могу отправлять и получать сообщения с его помощью. Он также работает с template.sendAndReceive после удаления «nullChannel».