Airflow ExternalTaskSensor не распознает задачу в DAG со статусом SUCCESS

#python #airflow

#python #поток воздуха

Вопрос:

Получил test_dag_father, выглядящий так:

 from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from airflow.sensors.external_task_sensor import ExternalTaskSensor
from airflow.utils.state import State

WORKFLOW_DAG_ID = "test_dag_father"

WORKFLOW_START_DATE = datetime(year=2020, month=4, day=27)

workflow_default_args = {
    "owner": "me",
    "depends_on_past": False,
    "start_date": WORKFLOW_START_DATE,
    "email": [mail],
    "email_on_failure": True,
    "email_on_retry": True,
    "schedule_interval": "*/5 * * * *",
}

now = datetime.now()

dag = DAG(
    dag_id=WORKFLOW_DAG_ID,
    start_date=WORKFLOW_START_DATE,
    default_args=workflow_default_args,
    catchup=False,
    schedule_interval="*/5 * * * *",
)


t1 = TriggerDagRunOperator(task_id="run_son_dag", trigger_dag_id="test_dag_son", dag=dag)

t2 = ExternalTaskSensor(
    task_id="waiting_for_dag",
    external_dag_id="test_dag_son",
    external_task_id="special",
    check_existence=True,
    poke_interval=5,
    allowed_states=[State.SUCCESS, State.SKIPPED, State.FAILED, State.NONE],
    timeout=90,
    dag=dag,
)

t3 = BashOperator(task_id="echo3", bash_command='echo "Here is the message: 3"', dag=dag, retries=1,)


t1 >> t2 >> t3 

и управляемый dag называется test_dag_son, который выглядит следующим образом:

 from datetime import datetime

from airflow import DAG
from airflow.operators.bash_operator import BashOperator

WORKFLOW_DAG_ID = "test_dag_son"

WORKFLOW_START_DATE = datetime(year=2020, month=4, day=27)

workflow_default_args = {
    "owner": "me",
    "depends_on_past": False,
    "start_date": WORKFLOW_START_DATE,
    "email": [mail],
    "email_on_failure": True,
    "email_on_retry": True,
    "schedule_interval": None,
}


dag = DAG(dag_id=WORKFLOW_DAG_ID, start_date=WORKFLOW_START_DATE, default_args=workflow_default_args, catchup=False)



t1 = BashOperator(task_id="echo1", bash_command='echo "Here is the message: 1"', dag=dag, retries=1,)

t2 = BashOperator(task_id="special", bash_command='echo "Here is the message: 2"', dag=dag, retries=1,)


t1 >> t2 

При запуске test_dag_father я получаю от ExternalTaskSensor этот журнал:

 [2020-11-26 16:07:54,558] {{base_task_runner.py:133}} INFO - Running: ['airflow', 'run', 'test_dag_father', 'waiting_for_dag', **'2020-11-26T16:00:00 00:00'**, '--job_id', '2435', '--pool', 'default_pool', '--raw', '-sd', 'DAGS_FOLDER/DAGs/tests/test_dag_father.py', '--cfg_path', '/tmp/tmpdif2n401']

[2020-11-26 16:07:56,216] {{external_task_sensor.py:113}} INFO - Poking for test_dag_son.special on **2020-11-26T16:00:00 00:00** ... 
[2020-11-26 16:07:56,242] {{base_task_runner.py:115}} INFO - Job 2435: Subtask waiting_for_dag [2020-11-26 16:07:56,241] {{dagbag.py:90}} INFO - Filling up the DagBag from /usr/local/airflow/dags/DAGs/tests/test_dag_son.py
[2020-11-26 16:08:01,302] {{external_task_sensor.py:113}} INFO - Poking for test_dag_son.special on 2020-11-26T16:00:00 00:00 ... 
[2020-11-26 16:08:01,326] {{base_task_runner.py:115}} INFO - Job 2435: Subtask waiting_for_dag [2020-11-26 16:08:01,326] {{dagbag.py:90}} INFO - Filling up the DagBag from /usr/local/airflow/dags/DAGs/tests/test_dag_son.py
[2020-11-26 16:08:06,381] {{external_task_sensor.py:113}} INFO - Poking for test_dag_son.special on 2020-11-26T16:00:00 00:00 ... 
[2020-11-26 16:08:06,406] {{base_task_runner.py:115}} INFO - Job 2435: Subtask waiting_for_dag [2020-11-26 16:08:06,405] {{dagbag.py:90}} INFO - Filling up the DagBag from /usr/local/airflow/dags/DAGs/tests/test_dag_son.py
[2020-11-26 16:08:11,492] {{external_task_sensor.py:113}} INFO - Poking for test_dag_son.special on 2020-11-26T16:00:00 00:00 ... 
[2020-11-26 16:08:11,516] {{base_task_runner.py:115}} INFO - Job 2435: Subtask waiting_for_dag [2020-11-26 16:08:11,516] {{dagbag.py:90}} INFO - Filling up the DagBag from /usr/local/airflow/dags/DAGs/tests/test_dag_son.py
[2020-11-26 16:08:16,576] {{external_task_sensor.py:113}} INFO - Poking for test_dag_son.special on 2020-11-26T16:00:00 00:00 ... 
[2020-11-26 16:09:28,088] {{taskinstance.py:1047}} ERROR - Snap. Time is OUT. 

Тайм-аут составляет 90 секунд, так как test_dag_son завершается менее чем за 30 секунд. Я пытался использовать:

  1. Добавление execution_delta, но это не требуется, так как время для обоих dag одинаково (я выделил оба в журналах жирным шрифтом).
  2. Я запустил test_dag_father, используя schedule. У test_dag_son не должно быть никакого расписания.
  3. специальная задача успешно завершена и УСПЕШНО работает в пользовательском интерфейсе airflow.

Я был бы очень благодарен, если бы понял, что я делаю неправильно.

Ответ №1:

Невозможно, чтобы обе базы данных были запущены в одно и то же расписание / дату выполнения. Когда вы запускаете DAG с помощью TriggerDagRunOperator, это похоже на запуск этого DAG, test_dag_son вручную. Это означает, что дата выполнения равна моменту запуска вашего DAG.
Однако ExternalTaskSensor проверяет наличие специальной задачи в дату выполнения 2020-11-26T16:00:00 00:00, чего никогда не будет. Вы даже не можете использовать execution_delta, поскольку дата выполнения, указанная TriggerDagRunOperator, всегда будет отличаться.

Появилась новая версия TriggerDagRunOperator, позволяющая вам дождаться вашего инициированного DAG перед выполнением задачи. Таким образом, больше нет необходимости использовать ExternalTaskSensor.
Я сделал видео об этом прямо там: https://youtu.be/8uKW0mPWmCk

Наслаждайтесь 🙂