Для задач воздушного потока установлено значение «no_status», если значение catchup равно True

#airflow #airflow-scheduler

Вопрос:

Я пытаюсь настроить ряд задач воздушного потока для заполнения некоторых данных (catchup=True). Как только DAG развернута и не приостановлена, первое задание выполняется успешно, но все последующие запуски имеют свои задачи, no_status и они никогда не запускаются. введите описание изображения здесь

Я пробовал варианты переименования DAG, перезапуска сервера и планировщика воздушного потока, очистки старых журналов, но здесь я не добился никакого прогресса.

Мысли?

Код DAG:

 default_args = {
    "owner": "me",
    "retries": 2,
    "retry_delay": timedelta(minutes=2),
    "sla": timedelta(hours=1),
    "start_date": "2021-01-01T00:00",
}

dag = DAG(
    catchup=True,
    dag_id="ingest_dag_testing_6",
    dagrun_timeout=timedelta(hours=1),
    default_args=default_args,
    max_active_runs=1,
    schedule_interval="30 * * * *",
)

DATA_SOURCE_TYPES = [
    {
        "target_name": "task_a",
        "children": [
            {
                "target_name": "subtask_a1",
            },
            {
                "target_name": "subtask_a2",
            },
        ],
    }
]

with dag:
    for dst in DATA_SOURCE_TYPES:
        sub_ingest_tasks = []

        ingest_task = PythonOperator(
            task_id=f"ingest_{dst.get('target_name')}",
            python_callable=run_ingestion,
            op_args=[logger, exe_date, dst],
        )
        if dst.get("children"):
            for sdst in dst.get("children"):
                sub_ingest_tasks.append(
                    PythonOperator(
                        task_id=f"ingest_{sdst.get('target_name')}",
                        python_callable=run_ingestion,
                        op_args=[logger, exe_date, sdst],
                    )
                )

        ingest_task >> sub_ingest_tasks

 

Комментарии:

1. Протестировал ваш пример кода, и он отлично работает для меня.

Ответ №1:

Ваш код выполняется просто отлично.

Я создал выполняемый пример из вашего кода (так как в нем нет импорта/вызываемых объектов):

 from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import timedelta


def run_ingestion(**context):
    print("Hello World")

default_args = {
    "owner": "me",
    "retries": 2,
    "retry_delay": timedelta(minutes=2),
    "sla": timedelta(hours=1),
    "start_date": "2021-01-01T00:00",
}

dag = DAG(
    catchup=True,
    dag_id="ingest_dag_testing_6",
    dagrun_timeout=timedelta(hours=1),
    default_args=default_args,
    max_active_runs=1,
    schedule_interval="30 * * * *",
)

DATA_SOURCE_TYPES = [
    {
        "target_name": "task_a",
        "children": [
            {
                "target_name": "subtask_a1",
            },
            {
                "target_name": "subtask_a2",
            },
        ],
    }
]

with dag:
    for dst in DATA_SOURCE_TYPES:
        sub_ingest_tasks = []

        ingest_task = PythonOperator(
            task_id=f"ingest_{dst.get('target_name')}",
            python_callable=run_ingestion,
            #op_args=[logger, exe_date, dst],
        )
        if dst.get("children"):
            for sdst in dst.get("children"):
                sub_ingest_tasks.append(
                    PythonOperator(
                        task_id=f"ingest_{sdst.get('target_name')}",
                        python_callable=run_ingestion,
                        #op_args=[logger, exe_date, sdst],
                    )
                )

        ingest_task >> sub_ingest_tasks
 

Вы можете видеть, что это работает нормально:

введите описание изображения здесь

Если вы используете старую версию Airflow, возможно, что изменение dag_id может решить проблему. Возможно, есть некоторые старые следы записей базы данных, связанных с этим dag_id, которые не были очищены должным образом. Планировщик был значительно переработан в более поздних версиях.

Если бы вышесказанное не помогло, вероятно, единственным решением было бы обновление до последней версии Airflow, поскольку, вероятно, это ошибка в более старых версиях, которая была исправлена в процессе (поскольку код, которым вы поделились, не воспроизводит проблему, которую вы описываете в последней версии Airflow).

Комментарии:

1. Спасибо, @Elad. Я провел некоторые дополнительные работы по устранению неполадок и обнаружил, что задания расписания были выбраны по времени после завершения первого задания. Мне придется еще немного повозиться с этими настройками.