Можем ли мы передать переменную x_com следующей задаче в DAG в качестве параметра?

# #google-cloud-platform #google-cloud-composer #airflow

Вопрос:

Пожалуйста, рассмотрите этот сценарий:

У меня есть облачный Dag в cloud composer, который запускает облачную функцию. Эта функция обращается к api, а затем сохраняет таблицы в GCS. Теперь моя DAG Airflow(с использованием Cloud Composer) запускает следующий этап, т. е. задание Dataproc, которое берет таблицу из GCS и помещает ее в BQ, но когда я запускаю шаблон рабочего процесса Dataproc, я передаю параметр, который является именем таблицы из самой dag, и этот параметр я хочу выбрать из x_com.

Вот фрагмент кода выдает ошибку, что он не определен

 dataproc_job = dataproc_operator.DataprocWorkflowTemplateInstantiateOperator(
# The task id of your job
task_id="dataproc_job",
# The template id of your workflow
template_id="newwf1",
project_id='#######',
region="us-central1",
parameters={"TABLE_NAME":ti.xcom_pull(task_ids=simple_http}
)
 

Как я могу устранить эту ошибку и передать значение x_com в качестве параметра для моего следующего шага в DAG ?

Ответ №1:

Предполагая parameters , что параметр может быть шаблонизирован (он же указан как a templated_field в DataprocWorkflowTemplateInstantiateOperator ), вы можете использовать выражение Jinja для доступа к XCom значению.

 DataprocWorkflowTemplateInstantiateOperator(
    # The task id of your job
    task_id="dataproc_job",
    # The template id of your workflow
    template_id="newwf1",
    project_id='#######',
    region="us-central1", 
    parameters={"TABLE_NAME": "{{ ti.xcom_pull(task_ids='simple_http' }}"} 
)

 

Подробнее о шаблонах Jinja в потоке воздуха здесь, а также об использовании XComs с шаблонами, упомянутыми в этом документе.

Кроме того, этот оператор выглядит как очень старый оператор Airflow 1. Если вы в состоянии, я бы настоятельно рекомендовал перейти на Airflow 2. Существует множество улучшений как в функциональности, так и в производительности, которые значительно улучшат работу вашего воздушного потока и конвейера.