# #java #google-cloud-dataflow #apache-beam
Вопрос:
Я пытаюсь разработать потоковый конвейер, который считывает данные из расположения файла и выполняет набор задач обработки полученных файлов, и в идеале должен сливаться при запуске. Однако, когда я протестировал функцию слива на бегуне потока данных, функция слива, похоже, не работает. В рабочих журналах отображаются следующие сообщения после фиксации запроса на утечку:
Отредактированное содержимое ссылается на папку GCS с шаблоном подстановочных знаков, например, gs://папка/*
Насколько я понимаю, эти сообщения журнала генерируются из преобразования FileIO.match (), которое я использую в своем коде. Чтобы сузить проблему, я попытался изолировать преобразование, чтобы выполнить следующее:
public static void main(String args[]) {
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
Pipeline p = Pipeline.create(options);
PCollection<String> windowed_files = p.apply("Match files", FileIO.match()
.filepattern("gs://<REDACTED_BUCKET>/<REDACTED_FOLDER>/*").continuously(Duration.standardSeconds(30), Watch.Growth.never()
).withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW_IF_WILDCARD))
.apply("Read files", FileIO. readMatches())
.apply("Map to filename", MapElements.into(TypeDescriptors.strings()).via(
(FileIO.ReadableFile file) -> {
return file.getMetadata().resourceId().getFilename();
}
));
p.run();
}
Однако, похоже, я не могу вызвать утечку даже для этой основной работы, так как журналы рабочих для этого конвейера показывают те же сообщения журнала, что и выше. Я не уверен, есть ли проблема с моим кодом или проблема с бегуном потока данных
Конвейер написан в Beam 2.32.0, Java SDK.