#python #airflow
#python #воздушный поток
Вопрос:
Я пытаюсь реализовать зависимость между двумя базами данных: parent_dag и child_dag. родительская база данных имеет две фиктивные задачи: leave_work, cook_dinner дочерняя база данных имеет три задачи: wait_for_dinner, have_dinner, play_with_food
wait_for_dinner использует external_task_sensor с external_dag_id в качестве parent_dag и external_task_id в качестве cook_dinner. start_date и schedule_interval одинаковы для обоих баз данных.
когда parent_dag завершается, то wait_for_dinner должен быть успешно завершен, и следующая задача должна выполняться, но в моем случае этого не происходит. (Он работает нормально, если базы данных не запланированы, т.е. schedule_interval = @once) Если базы данных запланированы, то wait_for_dinner находится в запущенном состоянии навсегда. он не прослушивает parent_dag success .
Пожалуйста, помогите мне с этим.
фрагменты кода:
parent_dag :
dag = DAG(
'Parent_dag',
default_args=default_args,
start_date = convert_to_utc(1606712300),
schedule_interval=datetime.timedelta(minutes = 5),
is_paused_upon_creation = False,
catchup = False
)
leave_work = DummyOperator(
task_id='leave_work',
dag=dag,
)
cook_dinner = DummyOperator(
task_id='cook_dinner',
dag=dag,
)
leave_work >> cook_dinner
child_dag:
dag = DAG(
'Child_dag',
default_args=default_args,
start_date = convert_to_utc(1606712300),
schedule_interval=datetime.timedelta(minutes=5),
is_paused_upon_creation = False,
catchup = False
)
wait_for_dinner = ExternalTaskSensor(
task_id='wait_for_dinner',
external_dag_id='Parent_dag',
external_task_id='cook_dinner',
mode = "reschedule",
timeout = 3600,
dag = dag
)
have_dinner = DummyOperator(
task_id='have_dinner',
dag=dag,
)
play_with_food = DummyOperator(
task_id='play_with_food',
dag=dag,
)
wait_for_dinner >> have_dinner
wait_for_dinner >> play_with_food
Комментарии:
1. Можете ли вы попытаться установить для catchup значение True? Я подозревал, что у них никогда не было одинаковой даты выполнения
Ответ №1:
Когда я хочу запустить один dag из другого, я использую TriggerDagRunOperator
родительский, а не ExternalTaskSensor
дочерний. TriggerDagRunOperator
является более надежным.
Ответ №2:
Взгляните на новую версию TriggerDagRunOperator в Airflow 2.0, это стало намного проще, чем раньше.
Посмотрите мое видео здесь https://youtu.be/8uKW0mPWmCk