Конвейер Apache Beam с использованием потоковой передачи FileIO не истощает поток данных.

# #java #google-cloud-dataflow #apache-beam

Вопрос:

Я пытаюсь разработать потоковый конвейер, который считывает данные из расположения файла и выполняет набор задач обработки полученных файлов, и в идеале должен сливаться при запуске. Однако, когда я протестировал функцию слива на бегуне потока данных, функция слива, похоже, не работает. В рабочих журналах отображаются следующие сообщения после фиксации запроса на утечку:

Отредактированные Журналы рабочих Отредактированное содержимое ссылается на папку GCS с шаблоном подстановочных знаков, например, gs://папка/*

Насколько я понимаю, эти сообщения журнала генерируются из преобразования FileIO.match (), которое я использую в своем коде. Чтобы сузить проблему, я попытался изолировать преобразование, чтобы выполнить следующее:

     public static void main(String args[]) {

        PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();

        Pipeline p = Pipeline.create(options);

        PCollection<String> windowed_files = p.apply("Match files", FileIO.match()
                .filepattern("gs://<REDACTED_BUCKET>/<REDACTED_FOLDER>/*").continuously(Duration.standardSeconds(30), Watch.Growth.never()
                ).withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW_IF_WILDCARD))
                .apply("Read files", FileIO. readMatches())
                .apply("Map to filename", MapElements.into(TypeDescriptors.strings()).via(
                        (FileIO.ReadableFile file) -> {
                            return file.getMetadata().resourceId().getFilename();
                        }
                ));

        p.run();
    }
 

Однако, похоже, я не могу вызвать утечку даже для этой основной работы, так как журналы рабочих для этого конвейера показывают те же сообщения журнала, что и выше. Я не уверен, есть ли проблема с моим кодом или проблема с бегуном потока данных

Конвейер написан в Beam 2.32.0, Java SDK.