#airflow #dataflow #google-cloud-composer
#воздушный поток #поток данных #google-cloud-composer
Вопрос:
Я пытаюсь передать параметр из Google Composer в шаблон потока данных следующим образом, но это не работает.
# composer code
trigger_dataflow = DataflowTemplateOperator(
task_id="trigger_dataflow",
template="gs://mybucket/my_template",
dag=dag,
job_name='appsflyer_events_daily',
parameters={
"input": f'gs://my_bucket/' "{{ ds }}" "/*.gz"
}
)
# template code
class UserOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument(
'--input',
default='gs://my_bucket/*.gz',
help='path of input file')
def main():
pipeline_options = PipelineOptions()
user_options = pipeline_options.view_as(UserOptions)
p = beam.Pipeline(options=pipeline_options)
lines = (
p
| MatchFiles(user_options.input)
)
Комментарии:
1. Какую версию Airflow вы используете?
2. @vdolez Я использую версию: 1.10.10 composer
3. Кроме того, вы можете предоставить некоторые журналы ошибок? Возможно, шаблон был создан неправильно, или, возможно, Composer не хватает прав доступа.
4. Не так много журналов @vdolez. Конвейер просто не соответствует никаким файлам и завершается без каких-либо ошибок. Если я жестко закодирую шаблон файла в шаблон, все будет работать нормально.
5. Можете ли вы запустить шаблон без Composer? Например, из пользовательского интерфейса? Вы выполнили шаги, описанные в документации ? Загруженный файл метаданных и т. Д
Ответ №1:
Вы можете передать, как показано ниже.
DataflowTemplateOperator(,
task_id="task1",
template=get_variable_value("template"),
on_failure_callback=update_job_message,
parameters={
"fileBucket": get_variable_value("file_bucket"),
"basePath": get_variable_value("path_input"),
"Day": "{{ json.loads(ti.xcom_pull(key=run_id))['day'] }}",
},
)
Мы используем Java, и в заданиях потока данных у нас есть класс опций get и set, как показано ниже
public interface MyOptions extends CommonOptions {
@Description("The output bucket")
@Validation.Required
ValueProvider<String> getFileBucket();
void setFileBucket(ValueProvider<String> value);
}
Нам нужно создать шаблон для этого задания потока данных, и этот шаблон будет запущен с помощью базы данных composer.
Ответ №2:
Переход от классического шаблона потока данных к гибкому шаблону исправил проблему.