Зависимость перекрестного dag воздушного потока

#python #airflow

#python #воздушный поток

Вопрос:

Я пытаюсь реализовать зависимость между двумя базами данных: parent_dag и child_dag. родительская база данных имеет две фиктивные задачи: leave_work, cook_dinner дочерняя база данных имеет три задачи: wait_for_dinner, have_dinner, play_with_food

wait_for_dinner использует external_task_sensor с external_dag_id в качестве parent_dag и external_task_id в качестве cook_dinner. start_date и schedule_interval одинаковы для обоих баз данных.

когда parent_dag завершается, то wait_for_dinner должен быть успешно завершен, и следующая задача должна выполняться, но в моем случае этого не происходит. (Он работает нормально, если базы данных не запланированы, т.е. schedule_interval = @once) Если базы данных запланированы, то wait_for_dinner находится в запущенном состоянии навсегда. он не прослушивает parent_dag success .

Пожалуйста, помогите мне с этим.

фрагменты кода:

parent_dag :

 dag = DAG(
    'Parent_dag', 
    default_args=default_args, 
    start_date = convert_to_utc(1606712300), 
    schedule_interval=datetime.timedelta(minutes = 5),
    is_paused_upon_creation = False,
    catchup = False
    )

leave_work = DummyOperator(
    task_id='leave_work',
    dag=dag,
)
cook_dinner = DummyOperator(
    task_id='cook_dinner',
    dag=dag,
)

leave_work >> cook_dinner
 

child_dag:

 dag = DAG(
    'Child_dag', 
    default_args=default_args,
    start_date = convert_to_utc(1606712300), 
    schedule_interval=datetime.timedelta(minutes=5),
    is_paused_upon_creation = False,
    catchup = False
    )

wait_for_dinner = ExternalTaskSensor(
    task_id='wait_for_dinner',
    external_dag_id='Parent_dag',
    external_task_id='cook_dinner',
    mode = "reschedule",
    timeout = 3600,
    dag = dag
)

have_dinner = DummyOperator(
    task_id='have_dinner',
    dag=dag,
)
play_with_food = DummyOperator(
    task_id='play_with_food',
    dag=dag,
)

wait_for_dinner >> have_dinner
wait_for_dinner >> play_with_food
 

Комментарии:

1. Можете ли вы попытаться установить для catchup значение True? Я подозревал, что у них никогда не было одинаковой даты выполнения

Ответ №1:

Когда я хочу запустить один dag из другого, я использую TriggerDagRunOperator родительский, а не ExternalTaskSensor дочерний. TriggerDagRunOperator является более надежным.

Ответ №2:

Взгляните на новую версию TriggerDagRunOperator в Airflow 2.0, это стало намного проще, чем раньше.
Посмотрите мое видео здесь https://youtu.be/8uKW0mPWmCk