#python #apache-beam
Вопрос:
Мы пытаемся получить значение времени выполнения (путь к источнику) в конвейере потока данных (apache beam с Python). Когда мы выполняем чтение из текста, значение runtimevalue работает, но когда мы делаем то же самое с fileio.Совпадающие файлы, это не удается.
**
- Ниже показано, что при создании шаблона все работает нормально —
**
pipeline_options = PipelineOptions(pipeline_args) pcoll = beam.Pipeline(options=pipeline_options) wordcount_options = pipeline_options.view_as(WordcountOptions) test = pcoll | 'Read' gt;gt; ReadFromText(wordcount_options.input)
**-
Ниже не работает. Это не удается, когда мы создаем шаблон
**
wordcount_options = pipeline_options.view_as(WordcountOptions) test = pcoll | 'Get matches for input' gt;gt; fileio.MatchFiles(wordcount_options.input)
Ошибка, которую я получаю, — «apache_beam.typehints.декораторы.Ошибка проверки типа: Нарушение подсказки типа для ‘ParDo(_MatchAllFn)’: требуется lt;класс ‘str’gt;, но есть lt;класс ‘str’gt;lt;класс ‘apache_beam.options.value_provider.RuntimeValueProvider ‘gt; для шаблона файла»
Я попробовал с помощью fileio.Сопоставьте файлы(wordcount_options.input) и fileio.Сопоставление файлов(wordcount_options.input.get()), оба из них завершаются ошибкой.
Как мы должны использовать входной путь из значения времени выполнения для этого файла.Совпадающие файлы ?
Нам нужно получить исходный путь из шаблона, так как исходный путь меняется ежедневно (в зависимости от текущей даты выполнения). Любые предложения по преодолению этого были бы очень полезны.
Спасибо.