Вывод по трубопроводу/загрузка потока ввода в sftp с использованием интеграции spring

#java #spring-boot #spring-integration

Вопрос:

Я бы хотел, чтобы поток данных через пару раз шел на сервер sftp. Я использую интеграцию spring boot. Я настроил SftpRemoteFileTemplate вот так

   @Autowired private SessionFactory<ChannelSftp.LsEntry> sftpSessionFactory;

  @Bean
  public SftpRemoteFileTemplate sftpRemoteFileTemplate() {
    final SftpRemoteFileTemplate template = new SftpRemoteFileTemplate(sftpSessionFactory);

    template.setRemoteDirectoryExpression(
            new LiteralExpression(contactSenderSftpProperties.getSftpSessionProperties().getBaseSftpPath()));

    template.setTemporaryFileSuffix(".tmp");

    return template;
  }
 

но мой файл назначения не добавляется, а перезаписывается последним содержимым временного файла, в который я отправляю данные.

Мой писатель выглядит так

   public void write(List<? extends Item> items) throws Exception {
    log.debug("Write {}", items);

    final int timeoutSeconds = 60;

    try (PipedInputStream pipedInputStream = new PipedInputStream()) {
      log.debug("Preparing to write...");
      final CountDownLatch countDownLatch = new CountDownLatch(1);

      writerClient.write(items, pipedInputStream, countDownLatch);

      if (!countDownLatch.await(timeoutSeconds, TimeUnit.SECONDS)) {
        throw new TimeoutException("Operation stream not connected");
      }

      sftpRemoteFileTemplate.send(
              MessageBuilder.withPayload(pipedInputStream).setHeader(FileHeaders.FILENAME, "contacts.csv").build());

    }
  }
 

где метод WriterClient#запись

   @Async("writerThreadPoolTaskExecutor")
  public void write(List<? extends Item> items, PipedInputStream pipedInputStream, CountDownLatch countDownLatch) throws IOException {
    try(final PipedOutputStream pipedOutputStream = new PipedOutputStream(pipedInputStream)){
      countDownLatch.countDown();
      csvSerializer.serialize(pipedOutputStream, items.stream());
    }
  }
 

и writerThreadPoolTaskExecutor

   @Bean
  public ThreadPoolTaskExecutor writerThreadPoolTaskExecutor(TaskExecutorBuilder taskExecutorBuilder) {
    return taskExecutorBuilder
            .corePoolSize(properties.getWriterThreadPoolCorePoolSize())
            .maxPoolSize(properties.getWriterThreadPoolMaxPoolSize())
            .queueCapacity(properties.getWriterThreadPoolQueueCapacity())
            .threadNamePrefix("writer-task-thread")
            .build();
  }
 

В двух словах, я хотел бы написать много небольших временных файлов и объединить их в один, содержащий все данные. Я не совсем уверен в PipedInput/OutputStream. Можно ли добавлять поток PipedOutputStream много раз и загружать поток PipedInputStream только один раз в sftp, когда больше нет данных для записи ? Но тогда возникает вопрос, как мне узнать, была ли написана вся дата ?

Комментарии:

1. sftp действительно имеет функцию добавления, поэтому теоретически это должно быть возможно

2. Да, это так, но не разрешается добавлять временные файлы.

3. Я мог бы создать один файл в начале, добавить к нему, а затем переместить его в каталог назначения.

4. Да; и RemoteFileTemplate у append() этого есть метод.

5. Я решил не использовать функцию временных имен файлов. Мое решение-записать (с каждым добавлением вызова) файл во временный каталог на sftp-сервере. Когда больше нет данных для загрузки, переместите файл в каталог назначения.

Ответ №1:

Взгляните на адаптер исходящего канала SFTP и его APPEND режим: https://docs.spring.io/spring-integration/docs/current/reference/html/sftp.html#spel-and-the-sftp-outbound-adapter

Ответ №2:

Используйте append() метод на RemoteFileTemplate .

     /**
     * Send a file to a remote server, based on information in a message, appending.
     *
     * @param message The message.
     * @return The remote path, or null if no local file was found.
     * @since 4.1
     */
    String append(Message<?> message);
 

Реализация выглядит следующим образом

     @Override
    public String append(Message<?> message, String subDirectory) {
        return send(message, subDirectory, FileExistsMode.APPEND);
    }