#java #spring #spring-boot #spring-integration
#java #spring #spring-загрузка #spring-интеграция
Вопрос:
Я использую FileReadingMessageSource
в качестве источника сообщения в Spring Integration ESB.
В частности, я хотел бы использовать WatchServiceDirectoryScanner
. Я использую конфигурацию на основе аннотаций с загрузкой spring.
Если я не использую WatchServiceDirectoryScanner
, все в порядке, но когда я устанавливаю setUseWatchService(true)
, я не могу заставить его работать.
Только файл, присутствующий в каталоге при запуске приложения, генерирует новые сообщения в канале. Файлы, скопированные или созданные в каталоге, не генерируют никаких сообщений.
Я хотел бы использовать WatchServiceDirectoryScanner
, поскольку я хочу сгенерировать только сообщение
Это код, в котором я настраиваю канал, адаптер и источник:
@Bean(name="source")
public FileReadingMessageSource getFileMessageSource(){
FileReadingMessageSource lm = new FileReadingMessageSource();
lm.setBeanName("fileMessageSource");
lm.setDirectory(new File("C:/DestDir"));
lm.setAutoCreateDirectory(true);
lm.setFilter(new AcceptAllFileListFilter<>());
lm.setUseWatchService(true);
lm.setWatchEvents(WatchEventType.CREATE);
return lm;
}
@Bean(name="channel")
public PublishSubscribeChannel getFileChannel(){
PublishSubscribeChannel psc = new PublishSubscribeChannel();
psc.setLoggingEnabled(true);
psc.setComponentName("channelexp");
return psc;
}
@Bean
@DependsOn({"source","channel"})
public SourcePollingChannelAdapter getChannelAdapter(){
SourcePollingChannelAdapter spca = new SourcePollingChannelAdapter();
FileReadingMessageSource frms = context.getBean(FileReadingMessageSource.class);
PublishSubscribeChannel psc = context.getBean("channel",PublishSubscribeChannel.class);
spca.setSource(frms);
spca.setOutputChannel(psc);
return spca;
}
Даже если я использую @InboundChannelAdapter
, разницы нет
@Bean(name="source")
@InboundChannelAdapter(value = "channel", poller = @Poller(fixedDelay = "1000"))
public FileReadingMessageSource getFileMessageSource(){
FileReadingMessageSource lm = new FileReadingMessageSource();
lm.setDirectory(new File("C:/fromDir/"));
lm.setFilter(new AcceptAllFileListFilter<>());
lm.setUseWatchService(true);
lm.setWatchEvents(WatchEventType.CREATE);
return lm;
}
Где я делаю неправильно?
Комментарии:
1. Включите ведение журнала ОТЛАДКИ, чтобы наблюдать за событиями просмотра.
Ответ №1:
Я не вижу кода опроса в вашей конфигурации, но мне интересно, есть ли у вас что-нибудь еще:
- Вы должны использовать
@EnableIntegration
SourcePollingChannelAdapter
Должен быть предоставлен сPollerMetadata
- Или просто подумайте об использовании
@InboundChannelAdapter
(см. Справочное руководство) - Это действительно выглядит странно для использования
context.getBean()
во время определения компонента. Вы должны использовать там инъекцию. Первый предназначен для выполнения, а не для фазы инициализации.
Редактировать
Spring Интеграция Java DSL пример:
@SpringBootApplication
public class FileChangeLineSeparator {
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = new SpringApplicationBuilder(FileChangeLineSeparator.class)
.web(false)
.run(args);
System.out.println("Put a windows file with a .txt extension in /tmp/in,"
"nthe file will be converted to Un*x and placed in"
"n/tmp/out"
"nnHit enter to terminate");
System.in.read();
context.close();
}
@Bean
@InboundChannelAdapter(value = "channel", poller = @Poller(fixedDelay = "1000"))
public FileReadingMessageSource getFileMessageSource() {
FileReadingMessageSource lm = new FileReadingMessageSource();
lm.setDirectory(new File("/tmp/in"));
lm.setFilter(new AcceptAllFileListFilter<>());
lm.setUseWatchService(true);
lm.setWatchEvents(FileReadingMessageSource.WatchEventType.CREATE);
return lm;
}
@Bean
public IntegrationFlow fileToFile() {
return IntegrationFlows.from("channel")
.transform(Transformers.fileToString())
.transform("payload.replaceAll('rn', 'n')")
.handle(Files.outboundAdapter("'/tmp/out'")
.autoCreateDirectory(true))
.get();
}
}
В конечном итоге я создал новые файлы, и они правильно подобраны и обработаны.
Комментарии:
1. Да, я освобождаю, это странно, но я планирую использовать этот код во время выполнения.
2. Что ж, для среды выполнения многое предстоит сделать… Подумайте о том, чтобы изучить функцию Spring Integration Java
IntegrationFlowContext
DSL: spring.io/blog/2016/10/14 /. … Просто прочитайте все связанные сообщения в блогах о новых функциях. Также взгляните на пример github.com/spring-projects/spring-integration-samples/tree /…
Ответ №2:
Даже если ответ @Artem Bilan был очень полезен (вот почему я его принял) Я публикую свой собственный ответ, чтобы показать свое окончательное решение.
Это полный пример
Мне приходилось отслеживать каталог и создавать сообщение каждый раз при изменении файла. Я использую Spring ESB FileReadingMessageSource
. Кроме того, мне пришлось создавать экземпляры всех компонентов во время выполнения, по этой причине я использовал Spring Integration Java DSL IntegrationFlowContext
.
Вот как я инициализировал источник во время выполнения (сообщение отправляется на "output"
канал).
@Autowired
private IntegrationFlowContext intContext;
public void initDispatcher(String name,Long pollingFreq) throws HandlerConfigurationException {
logger.info("Source " name " Inizializing");
IntegrationFlow sourceModifiedFlow = IntegrationFlows
.from(this.getFileModifiedMessageSource(), s -> s.poller(Pollers.fixedRate(pollingFreq, TimeUnit.MILLISECONDS)))
.channel("outputChannel").get();
intContext.registration(sourceModifiedFlow).id(name)).autoStartup(true).register();
}
Вот как я создаю фактический FileReader
.
public FileReadingMessageSource getFileModifiedMessageSource() {
FileReadingMessageSource lm = new FileReadingMessageSource();
lm.setBeanName(String.format(SOURCE_MODIFIED_SUFFIX, name));
lm.setDirectory(new File(readingDir));
ModifiedOnlyFileScanner modifiedOnlyFileScanner = new ModifiedOnlyFileScanner(new File(readingDir));
modifiedOnlyFileScanner.setFilter(new AcceptAllFileListFilter<>());
lm.setScanner(modifiedOnlyFileScanner);
return lm;
}
Сканер файлов по умолчанию ( org.springframework.integration.file.FileReadingMessageSource.WatchServiceDirectoryScanner
) мне не подходил. В основном по двум причинам:
- при запуске генерируйте сообщение для каждого файла, присутствующего в каталоге
- даже если я настроил средство чтения
fileReadSource.setWatchEvents(WatchEventType.MODIFY);
, источник также отправляет сообщение при создании нового файла в каталог.
Итак, я заканчиваю тем, что пишу свой собственный сканер. Это полный код:
public class ModifiedOnlyFileScanner extends DefaultDirectoryScanner implements Lifecycle {
private final static Logger logger = LogManager.getLogger(FileDispatcher.class);
private final ConcurrentMap<Path, WatchKey> pathKeys = new ConcurrentHashMap<Path, WatchKey>();
private WatchService watcher;
private WatchEvent.Kind<?>[] kinds;
private File directory;
@Override
public void start() {
try {
this.watcher = FileSystems.getDefault().newWatchService();
}
catch (IOException e) {
logger.error("Failed to create watcher for " directory, e);
}
this.kinds = new WatchEvent.Kind<?>[]{StandardWatchEventKinds.ENTRY_MODIFY};
walkDirectory(directory.toPath(), null);
}
public ModifiedOnlyFileScanner(File directory) {
super();
this.directory = directory;
}
@Override
public void stop() {
try {
this.watcher.close();
this.watcher = null;
this.pathKeys.clear();
}
catch (IOException e) {
logger.error("Failed to close watcher for " directory, e);
}
}
@Override
public boolean isRunning() {
return true;
}
@Override
protected File[] listEligibleFiles(File directory) {
Assert.state(this.watcher != null, "The WatchService has'nt been started");
Collection<File> files = filesFromEvents();
if(files.size()>0){
logger.info("Detected Modified files " files);
}
return files.toArray(new File[files.size()]);
}
private Set<File> filesFromEvents() {
WatchKey key = this.watcher.poll();
Set<File> files = new LinkedHashSet<File>();
while (key != null) {
File parentDir = ((Path) key.watchable()).toAbsolutePath().toFile();
for (WatchEvent<?> event : key.pollEvents()) {
if (event.kind() == StandardWatchEventKinds.ENTRY_MODIFY ) {
Path item = (Path) event.context();
File file = new File(parentDir, item.toFile().getName());
if (file.exists()) {
//I do not dig into directories
if (!file.isDirectory()) {
files.remove(file);
files.add(file);
}
}
else {
logger.warn("A file [" file "] for the event [" event.kind()
"] doesn't exist. Ignored.");
}
}
}
key.reset();
key = this.watcher.poll();
}
return files;
}
private Set<File> walkDirectory(Path directory, final WatchEvent.Kind<?> kind) {
final Set<File> walkedFiles = new LinkedHashSet<File>();
try {
registerWatch(directory);
Files.walkFileTree(directory, new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
FileVisitResult fileVisitResult = super.preVisitDirectory(dir, attrs);
registerWatch(dir);
return fileVisitResu<
}
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
FileVisitResult fileVisitResult = super.visitFile(file, attrs);
if (!StandardWatchEventKinds.ENTRY_MODIFY.equals(kind)) {
walkedFiles.add(file.toFile());
}
return fileVisitResu<
}
});
}
catch (IOException e) {
logger.error("Failed to walk directory: " directory.toString(), e);
}
return walkedFiles;
}
private void registerWatch(Path dir) throws IOException {
if (!this.pathKeys.containsKey(dir)) {
if (logger.isDebugEnabled()) {
logger.debug("registering: " dir " for file events");
}
WatchKey watchKey = dir.register(this.watcher, this.kinds);
this.pathKeys.putIfAbsent(dir, watchKey);
}
}
}