#spring #ftp #stream #spring-integration
#spring #ftp #поток #spring-интеграция
Вопрос:
У меня есть следующий код, который работает нормально, извлекая файлы с FTP-сервера в поток, но мне нужно получить строку каждого файла, кажется, мне нужно использовать преобразователь, передающий кодировку, но чего мне не хватает? Как именно получить строку содержимого каждого переданного файла?
Заранее большое спасибо
@SpringBootApplication
@EnableIntegration
public class FtpinboundApp extends SpringBootServletInitializer implements WebApplicationInitializer {
final static Logger logger = Logger.getLogger(FtpinboundApp.class);
public static void main(String[] args) {
SpringApplication.run(FtpinboundApp.class, args);
}
@Bean
public SessionFactory<FTPFile> ftpSessionFactory() {
DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
sf.setHost("X.X.X.X");
sf.setPort(21);
sf.setUsername("xxx");
sf.setPassword("XXX");
return new CachingSessionFactory<FTPFile>(sf);
}
@Bean
@ServiceActivator(inputChannel = "stream")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println("trasnferred file:" message.getPayload());
}
};
}
@Bean
@InboundChannelAdapter(value = "stream", poller = @Poller(fixedRate = "1000"))
public MessageSource<InputStream> ftpMessageSource() {
FtpStreamingMessageSource messageSource = new FtpStreamingMessageSource(template(), null);
messageSource.setRemoteDirectory("/X/X/X");
messageSource.setFilter(new FtpPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "streaming"));
return messageSource;
}
@Bean
@Transformer(inputChannel = "stream", outputChannel = "data")
public org.springframework.integration.transformer.Transformer transformer() {
return new StreamTransformer("UTF-8");
}
@Bean
public FtpRemoteFileTemplate template() {
return new FtpRemoteFileTemplate(ftpSessionFactory());
}
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
PollerMetadata pollerMetadata = new PollerMetadata();
pollerMetadata.setTrigger(new PeriodicTrigger(5000));
return pollerMetadata;
}
}
Ответ №1:
Используйте a StreamTransformer
, чтобы получить весь файл в виде одной строки, или a FileSplitter
, чтобы получить сообщение для каждой строки.
РЕДАКТИРОВАТЬ (конфигурация фильтра)
@Bean
@InboundChannelAdapter(channel = "stream")
public MessageSource<InputStream> ftpMessageSource() {
FtpStreamingMessageSource messageSource = new FtpStreamingMessageSource(template(), null);
messageSource.setRemoteDirectory("ftpSource/");
messageSource.setFilter(filter());
return messageSource;
}
public FileListFilter<FTPFile> filter() {
CompositeFileListFilter<FTPFile> filter = new CompositeFileListFilter<>();
filter.addFilter(new FtpSimplePatternFileListFilter("*.txt"));
filter.addFilter(acceptOnceFilter());
return filter;
}
@Bean
public FtpPersistentAcceptOnceFileListFilter acceptOnceFilter() {
FtpPersistentAcceptOnceFileListFilter filter = new FtpPersistentAcceptOnceFileListFilter(meta(),
"streaming"); // keys will be, e.g. "streamingfoo.txt"
filter.setFlushOnUpdate(true);
return filter;
}
@Bean
public ConcurrentMetadataStore meta() {
PropertiesPersistingMetadataStore meta = new PropertiesPersistingMetadataStore();
meta.setBaseDirectory("/tmp/foo");
meta.setFileName("ftpStream.properties");
return meta;
}
РЕДАКТИРОВАТЬ 2 — удалить удаленный файл с рекомендациями
@ServiceActivator(inputChannel = "data", adviceChain = "after")
@Bean
public MessageHandler handle() {
return System.out::println;
}
@Bean
public ExpressionEvaluatingRequestHandlerAdvice after() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setOnSuccessExpression(
"@template.remove(headers['file_remoteDirectory'] headers['file_remoteFile'])");
advice.setPropagateEvaluationFailures(true);
return advice;
}
Комментарии:
1. Гэри, спасибо, да, на самом деле у меня есть: @Bean @Transformer (входной канал = «поток», выходной канал = «данные») общедоступная организация.springframework.integration. трансформатор. Transformer трансформатор() { возвращает новый StreamTransformer(«UTF-8»); } но я не совсем уверен, где его использовать в коде.
2. В настоящее время у вас есть подписка на активатор службы и transformer
stream
— это означает, что сообщения будут отправляться поочередно каждому — вам необходимо подписаться на активатор службы на выходной канал transformer.3. О! Я вижу. Я понял это сейчас! спасибо, Гэри, пользуясь вашей щедростью, не могли бы вы сообщить мне, как настроить фильтр источника потокового сообщения так, чтобы он получал только определенные файлы с сервера? Я не могу найти что-то понятное (для меня) в документации, и это кажется отличным от стандартного FTP inbound channel.
4. Смотрите редактирование моего ответа. Если вы удалите ключ (
prefix filename
) из хранилища или изменится временная метка удаленного файла, файл будет извлечен повторно.5. Вы действительно должны задавать новые вопросы вместо того, чтобы продолжать добавлять новые вопросы к этому. Смотрите мою вторую правку.