Spring Интеграция FileReadingMessageSource использование UseWatchService

#java #spring #spring-boot #spring-integration

#java #spring #spring-загрузка #spring-интеграция

Вопрос:

Я использую FileReadingMessageSource в качестве источника сообщения в Spring Integration ESB.

В частности, я хотел бы использовать WatchServiceDirectoryScanner . Я использую конфигурацию на основе аннотаций с загрузкой spring.

Если я не использую WatchServiceDirectoryScanner , все в порядке, но когда я устанавливаю setUseWatchService(true) , я не могу заставить его работать.

Только файл, присутствующий в каталоге при запуске приложения, генерирует новые сообщения в канале. Файлы, скопированные или созданные в каталоге, не генерируют никаких сообщений.

Я хотел бы использовать WatchServiceDirectoryScanner , поскольку я хочу сгенерировать только сообщение

Это код, в котором я настраиваю канал, адаптер и источник:

     @Bean(name="source")
    public FileReadingMessageSource getFileMessageSource(){
        FileReadingMessageSource lm = new FileReadingMessageSource();
        lm.setBeanName("fileMessageSource");
        lm.setDirectory(new File("C:/DestDir"));
        lm.setAutoCreateDirectory(true);

        lm.setFilter(new AcceptAllFileListFilter<>());
        lm.setUseWatchService(true);
        lm.setWatchEvents(WatchEventType.CREATE);
        return lm;
    }

@Bean(name="channel")
    public PublishSubscribeChannel getFileChannel(){
        PublishSubscribeChannel psc = new PublishSubscribeChannel();
        psc.setLoggingEnabled(true);
        psc.setComponentName("channelexp");
        return psc;

    }


    @Bean
    @DependsOn({"source","channel"})
    public SourcePollingChannelAdapter  getChannelAdapter(){
        SourcePollingChannelAdapter spca = new SourcePollingChannelAdapter();
        FileReadingMessageSource frms = context.getBean(FileReadingMessageSource.class);
        PublishSubscribeChannel psc = context.getBean("channel",PublishSubscribeChannel.class);
        spca.setSource(frms);
        spca.setOutputChannel(psc);
        return spca;
    }
  

Даже если я использую @InboundChannelAdapter , разницы нет

 @Bean(name="source")
    @InboundChannelAdapter(value = "channel", poller = @Poller(fixedDelay = "1000"))
    public FileReadingMessageSource getFileMessageSource(){
        FileReadingMessageSource lm = new FileReadingMessageSource();
        lm.setDirectory(new File("C:/fromDir/"));
        lm.setFilter(new AcceptAllFileListFilter<>());
        lm.setUseWatchService(true);
        lm.setWatchEvents(WatchEventType.CREATE);
        return lm;
    }
  

Где я делаю неправильно?

Комментарии:

1. Включите ведение журнала ОТЛАДКИ, чтобы наблюдать за событиями просмотра.

Ответ №1:

Я не вижу кода опроса в вашей конфигурации, но мне интересно, есть ли у вас что-нибудь еще:

  1. Вы должны использовать @EnableIntegration
  2. SourcePollingChannelAdapter Должен быть предоставлен с PollerMetadata
  3. Или просто подумайте об использовании @InboundChannelAdapter (см. Справочное руководство)
  4. Это действительно выглядит странно для использования context.getBean() во время определения компонента. Вы должны использовать там инъекцию. Первый предназначен для выполнения, а не для фазы инициализации.

Редактировать

Spring Интеграция Java DSL пример:

 @SpringBootApplication
public class FileChangeLineSeparator {

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = new SpringApplicationBuilder(FileChangeLineSeparator.class)
                .web(false)
                .run(args);
        System.out.println("Put a windows file with a .txt extension in /tmp/in,"
                  "nthe file will be converted to Un*x and placed in"
                  "n/tmp/out"
                  "nnHit enter to terminate");
        System.in.read();
        context.close();
    }

@Bean
@InboundChannelAdapter(value = "channel", poller = @Poller(fixedDelay = "1000"))
public FileReadingMessageSource getFileMessageSource() {
    FileReadingMessageSource lm = new FileReadingMessageSource();
    lm.setDirectory(new File("/tmp/in"));
    lm.setFilter(new AcceptAllFileListFilter<>());
    lm.setUseWatchService(true);
    lm.setWatchEvents(FileReadingMessageSource.WatchEventType.CREATE);
    return lm;
}

@Bean
public IntegrationFlow fileToFile() {
    return IntegrationFlows.from("channel")                 
                .transform(Transformers.fileToString())
                .transform("payload.replaceAll('rn', 'n')")
                .handle(Files.outboundAdapter("'/tmp/out'")
                        .autoCreateDirectory(true))
                .get();
    }

}
  

В конечном итоге я создал новые файлы, и они правильно подобраны и обработаны.

Комментарии:

1. Да, я освобождаю, это странно, но я планирую использовать этот код во время выполнения.

2. Что ж, для среды выполнения многое предстоит сделать… Подумайте о том, чтобы изучить функцию Spring Integration Java IntegrationFlowContext DSL: spring.io/blog/2016/10/14 /. … Просто прочитайте все связанные сообщения в блогах о новых функциях. Также взгляните на пример github.com/spring-projects/spring-integration-samples/tree /…

Ответ №2:

Даже если ответ @Artem Bilan был очень полезен (вот почему я его принял) Я публикую свой собственный ответ, чтобы показать свое окончательное решение.

Это полный пример

Мне приходилось отслеживать каталог и создавать сообщение каждый раз при изменении файла. Я использую Spring ESB FileReadingMessageSource . Кроме того, мне пришлось создавать экземпляры всех компонентов во время выполнения, по этой причине я использовал Spring Integration Java DSL IntegrationFlowContext .

Вот как я инициализировал источник во время выполнения (сообщение отправляется на "output" канал).

     @Autowired
    private IntegrationFlowContext intContext;

public void initDispatcher(String name,Long pollingFreq) throws HandlerConfigurationException {
        logger.info("Source "  name  " Inizializing");


        IntegrationFlow sourceModifiedFlow = IntegrationFlows
                .from(this.getFileModifiedMessageSource(), s -> s.poller(Pollers.fixedRate(pollingFreq, TimeUnit.MILLISECONDS)))
                .channel("outputChannel").get();
        intContext.registration(sourceModifiedFlow).id(name)).autoStartup(true).register();
    }
  

Вот как я создаю фактический FileReader .

  public FileReadingMessageSource getFileModifiedMessageSource() {
            FileReadingMessageSource lm = new FileReadingMessageSource();
            lm.setBeanName(String.format(SOURCE_MODIFIED_SUFFIX, name));
            lm.setDirectory(new File(readingDir));
            ModifiedOnlyFileScanner modifiedOnlyFileScanner = new ModifiedOnlyFileScanner(new File(readingDir));
            modifiedOnlyFileScanner.setFilter(new AcceptAllFileListFilter<>());
            lm.setScanner(modifiedOnlyFileScanner);

            return lm;
        }
  

Сканер файлов по умолчанию ( org.springframework.integration.file.FileReadingMessageSource.WatchServiceDirectoryScanner ) мне не подходил. В основном по двум причинам:

  1. при запуске генерируйте сообщение для каждого файла, присутствующего в каталоге
  2. даже если я настроил средство чтения fileReadSource.setWatchEvents(WatchEventType.MODIFY); , источник также отправляет сообщение при создании нового файла в каталог.

Итак, я заканчиваю тем, что пишу свой собственный сканер. Это полный код:

 public class ModifiedOnlyFileScanner extends DefaultDirectoryScanner implements Lifecycle {
    private final static Logger logger = LogManager.getLogger(FileDispatcher.class);


    private final ConcurrentMap<Path, WatchKey> pathKeys = new ConcurrentHashMap<Path, WatchKey>();

    private WatchService watcher;

    private WatchEvent.Kind<?>[] kinds;

    private File directory;

    @Override
    public void start() {
        try {
            this.watcher = FileSystems.getDefault().newWatchService();
        }
        catch (IOException e) {
            logger.error("Failed to create watcher for "   directory, e);
        }

        this.kinds = new WatchEvent.Kind<?>[]{StandardWatchEventKinds.ENTRY_MODIFY};
        walkDirectory(directory.toPath(), null);

    }

    public ModifiedOnlyFileScanner(File directory) {
        super();
        this.directory = directory;
    }

    @Override
    public void stop() {
        try {
            this.watcher.close();
            this.watcher = null;
            this.pathKeys.clear();
        }
        catch (IOException e) {
            logger.error("Failed to close watcher for "   directory, e);
        }
    }

    @Override
    public boolean isRunning() {
        return true;
    }

    @Override
    protected File[] listEligibleFiles(File directory) {
        Assert.state(this.watcher != null, "The WatchService has'nt been started");
        Collection<File> files = filesFromEvents();
        if(files.size()>0){
            logger.info("Detected Modified files " files);
        }
        return files.toArray(new File[files.size()]);
    }

    private Set<File> filesFromEvents() {
        WatchKey key = this.watcher.poll();
        Set<File> files = new LinkedHashSet<File>();
        while (key != null) {
            File parentDir = ((Path) key.watchable()).toAbsolutePath().toFile();
            for (WatchEvent<?> event : key.pollEvents()) {
                if (event.kind() == StandardWatchEventKinds.ENTRY_MODIFY ) {
                    Path item = (Path) event.context();
                    File file = new File(parentDir, item.toFile().getName());
                    if (file.exists()) {
                        //I do not dig into directories
                        if (!file.isDirectory()) {
                            files.remove(file);
                            files.add(file);
                        }
                    }
                    else {
                            logger.warn("A file ["   file   "] for the event ["   event.kind()  
                                    "] doesn't exist. Ignored.");
                    }
                }
            }
            key.reset();
            key = this.watcher.poll();
        }
        return files;
    }

    private Set<File> walkDirectory(Path directory, final WatchEvent.Kind<?> kind) {
        final Set<File> walkedFiles = new LinkedHashSet<File>();
        try {
            registerWatch(directory);
            Files.walkFileTree(directory, new SimpleFileVisitor<Path>() {

                @Override
                public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
                    FileVisitResult fileVisitResult = super.preVisitDirectory(dir, attrs);
                    registerWatch(dir);
                    return fileVisitResu<
                }

                @Override
                public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
                    FileVisitResult fileVisitResult = super.visitFile(file, attrs);
                    if (!StandardWatchEventKinds.ENTRY_MODIFY.equals(kind)) {
                        walkedFiles.add(file.toFile());
                    }
                    return fileVisitResu<
                }

            });
        }
        catch (IOException e) {
            logger.error("Failed to walk directory: "   directory.toString(), e);
        }
        return walkedFiles;
    }

    private void registerWatch(Path dir) throws IOException {
        if (!this.pathKeys.containsKey(dir)) {
            if (logger.isDebugEnabled()) {
                logger.debug("registering: "   dir   " for file events");
            }
            WatchKey watchKey = dir.register(this.watcher, this.kinds);
            this.pathKeys.putIfAbsent(dir, watchKey);
        }
    }
}