# #google-cloud-platform #google-cloud-composer #airflow
Вопрос:
Пожалуйста, рассмотрите этот сценарий:
У меня есть облачный Dag в cloud composer, который запускает облачную функцию. Эта функция обращается к api, а затем сохраняет таблицы в GCS. Теперь моя DAG Airflow(с использованием Cloud Composer) запускает следующий этап, т. е. задание Dataproc, которое берет таблицу из GCS и помещает ее в BQ, но когда я запускаю шаблон рабочего процесса Dataproc, я передаю параметр, который является именем таблицы из самой dag, и этот параметр я хочу выбрать из x_com.
Вот фрагмент кода выдает ошибку, что он не определен
dataproc_job = dataproc_operator.DataprocWorkflowTemplateInstantiateOperator(
# The task id of your job
task_id="dataproc_job",
# The template id of your workflow
template_id="newwf1",
project_id='#######',
region="us-central1",
parameters={"TABLE_NAME":ti.xcom_pull(task_ids=simple_http}
)
Как я могу устранить эту ошибку и передать значение x_com в качестве параметра для моего следующего шага в DAG ?
Ответ №1:
Предполагая parameters
, что параметр может быть шаблонизирован (он же указан как a templated_field
в DataprocWorkflowTemplateInstantiateOperator
), вы можете использовать выражение Jinja для доступа к XCom
значению.
DataprocWorkflowTemplateInstantiateOperator(
# The task id of your job
task_id="dataproc_job",
# The template id of your workflow
template_id="newwf1",
project_id='#######',
region="us-central1",
parameters={"TABLE_NAME": "{{ ti.xcom_pull(task_ids='simple_http' }}"}
)
Подробнее о шаблонах Jinja в потоке воздуха здесь, а также об использовании XComs
с шаблонами, упомянутыми в этом документе.
Кроме того, этот оператор выглядит как очень старый оператор Airflow 1. Если вы в состоянии, я бы настоятельно рекомендовал перейти на Airflow 2. Существует множество улучшений как в функциональности, так и в производительности, которые значительно улучшат работу вашего воздушного потока и конвейера.