#airflow #airflow-scheduler
Вопрос:
Я пытаюсь настроить ряд задач воздушного потока для заполнения некоторых данных (catchup=True). Как только DAG развернута и не приостановлена, первое задание выполняется успешно, но все последующие запуски имеют свои задачи, no_status
и они никогда не запускаются.
Я пробовал варианты переименования DAG, перезапуска сервера и планировщика воздушного потока, очистки старых журналов, но здесь я не добился никакого прогресса.
Мысли?
Код DAG:
default_args = {
"owner": "me",
"retries": 2,
"retry_delay": timedelta(minutes=2),
"sla": timedelta(hours=1),
"start_date": "2021-01-01T00:00",
}
dag = DAG(
catchup=True,
dag_id="ingest_dag_testing_6",
dagrun_timeout=timedelta(hours=1),
default_args=default_args,
max_active_runs=1,
schedule_interval="30 * * * *",
)
DATA_SOURCE_TYPES = [
{
"target_name": "task_a",
"children": [
{
"target_name": "subtask_a1",
},
{
"target_name": "subtask_a2",
},
],
}
]
with dag:
for dst in DATA_SOURCE_TYPES:
sub_ingest_tasks = []
ingest_task = PythonOperator(
task_id=f"ingest_{dst.get('target_name')}",
python_callable=run_ingestion,
op_args=[logger, exe_date, dst],
)
if dst.get("children"):
for sdst in dst.get("children"):
sub_ingest_tasks.append(
PythonOperator(
task_id=f"ingest_{sdst.get('target_name')}",
python_callable=run_ingestion,
op_args=[logger, exe_date, sdst],
)
)
ingest_task >> sub_ingest_tasks
Комментарии:
1. Протестировал ваш пример кода, и он отлично работает для меня.
Ответ №1:
Ваш код выполняется просто отлично.
Я создал выполняемый пример из вашего кода (так как в нем нет импорта/вызываемых объектов):
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import timedelta
def run_ingestion(**context):
print("Hello World")
default_args = {
"owner": "me",
"retries": 2,
"retry_delay": timedelta(minutes=2),
"sla": timedelta(hours=1),
"start_date": "2021-01-01T00:00",
}
dag = DAG(
catchup=True,
dag_id="ingest_dag_testing_6",
dagrun_timeout=timedelta(hours=1),
default_args=default_args,
max_active_runs=1,
schedule_interval="30 * * * *",
)
DATA_SOURCE_TYPES = [
{
"target_name": "task_a",
"children": [
{
"target_name": "subtask_a1",
},
{
"target_name": "subtask_a2",
},
],
}
]
with dag:
for dst in DATA_SOURCE_TYPES:
sub_ingest_tasks = []
ingest_task = PythonOperator(
task_id=f"ingest_{dst.get('target_name')}",
python_callable=run_ingestion,
#op_args=[logger, exe_date, dst],
)
if dst.get("children"):
for sdst in dst.get("children"):
sub_ingest_tasks.append(
PythonOperator(
task_id=f"ingest_{sdst.get('target_name')}",
python_callable=run_ingestion,
#op_args=[logger, exe_date, sdst],
)
)
ingest_task >> sub_ingest_tasks
Вы можете видеть, что это работает нормально:
Если вы используете старую версию Airflow, возможно, что изменение dag_id
может решить проблему. Возможно, есть некоторые старые следы записей базы данных, связанных с этим dag_id, которые не были очищены должным образом. Планировщик был значительно переработан в более поздних версиях.
Если бы вышесказанное не помогло, вероятно, единственным решением было бы обновление до последней версии Airflow, поскольку, вероятно, это ошибка в более старых версиях, которая была исправлена в процессе (поскольку код, которым вы поделились, не воспроизводит проблему, которую вы описываете в последней версии Airflow).
Комментарии:
1. Спасибо, @Elad. Я провел некоторые дополнительные работы по устранению неполадок и обнаружил, что задания расписания были выбраны по времени после завершения первого задания. Мне придется еще немного повозиться с этими настройками.