#java #spring-boot #spring-integration
Вопрос:
Я бы хотел, чтобы поток данных через пару раз шел на сервер sftp. Я использую интеграцию spring boot. Я настроил SftpRemoteFileTemplate вот так
@Autowired private SessionFactory<ChannelSftp.LsEntry> sftpSessionFactory;
@Bean
public SftpRemoteFileTemplate sftpRemoteFileTemplate() {
final SftpRemoteFileTemplate template = new SftpRemoteFileTemplate(sftpSessionFactory);
template.setRemoteDirectoryExpression(
new LiteralExpression(contactSenderSftpProperties.getSftpSessionProperties().getBaseSftpPath()));
template.setTemporaryFileSuffix(".tmp");
return template;
}
но мой файл назначения не добавляется, а перезаписывается последним содержимым временного файла, в который я отправляю данные.
Мой писатель выглядит так
public void write(List<? extends Item> items) throws Exception {
log.debug("Write {}", items);
final int timeoutSeconds = 60;
try (PipedInputStream pipedInputStream = new PipedInputStream()) {
log.debug("Preparing to write...");
final CountDownLatch countDownLatch = new CountDownLatch(1);
writerClient.write(items, pipedInputStream, countDownLatch);
if (!countDownLatch.await(timeoutSeconds, TimeUnit.SECONDS)) {
throw new TimeoutException("Operation stream not connected");
}
sftpRemoteFileTemplate.send(
MessageBuilder.withPayload(pipedInputStream).setHeader(FileHeaders.FILENAME, "contacts.csv").build());
}
}
где метод WriterClient#запись
@Async("writerThreadPoolTaskExecutor")
public void write(List<? extends Item> items, PipedInputStream pipedInputStream, CountDownLatch countDownLatch) throws IOException {
try(final PipedOutputStream pipedOutputStream = new PipedOutputStream(pipedInputStream)){
countDownLatch.countDown();
csvSerializer.serialize(pipedOutputStream, items.stream());
}
}
и writerThreadPoolTaskExecutor
@Bean
public ThreadPoolTaskExecutor writerThreadPoolTaskExecutor(TaskExecutorBuilder taskExecutorBuilder) {
return taskExecutorBuilder
.corePoolSize(properties.getWriterThreadPoolCorePoolSize())
.maxPoolSize(properties.getWriterThreadPoolMaxPoolSize())
.queueCapacity(properties.getWriterThreadPoolQueueCapacity())
.threadNamePrefix("writer-task-thread")
.build();
}
В двух словах, я хотел бы написать много небольших временных файлов и объединить их в один, содержащий все данные. Я не совсем уверен в PipedInput/OutputStream. Можно ли добавлять поток PipedOutputStream много раз и загружать поток PipedInputStream только один раз в sftp, когда больше нет данных для записи ? Но тогда возникает вопрос, как мне узнать, была ли написана вся дата ?
Комментарии:
1. sftp действительно имеет функцию добавления, поэтому теоретически это должно быть возможно
2. Да, это так, но не разрешается добавлять временные файлы.
3. Я мог бы создать один файл в начале, добавить к нему, а затем переместить его в каталог назначения.
4. Да; и
RemoteFileTemplate
уappend()
этого есть метод.5. Я решил не использовать функцию временных имен файлов. Мое решение-записать (с каждым добавлением вызова) файл во временный каталог на sftp-сервере. Когда больше нет данных для загрузки, переместите файл в каталог назначения.
Ответ №1:
Взгляните на адаптер исходящего канала SFTP и его APPEND
режим: https://docs.spring.io/spring-integration/docs/current/reference/html/sftp.html#spel-and-the-sftp-outbound-adapter
Ответ №2:
Используйте append()
метод на RemoteFileTemplate
.
/**
* Send a file to a remote server, based on information in a message, appending.
*
* @param message The message.
* @return The remote path, or null if no local file was found.
* @since 4.1
*/
String append(Message<?> message);
Реализация выглядит следующим образом
@Override
public String append(Message<?> message, String subDirectory) {
return send(message, subDirectory, FileExistsMode.APPEND);
}