#python #airflow #airflow-scheduler
#python #воздушный поток #воздушный поток-планировщик
Вопрос:
Я создал простую базу данных Airflow, которую я запланировал запускать каждые 5 минут. Однако на самом деле он запускается каждые 57 секунд.
Я попытался настроить расписание, используя оба:
- строка выражения расписания CRON
- Объект datetime timedelta
И все равно получаем одинаковый результат для обеих версий.
Я также пробовал разные интервалы расписания, и результат всегда один и тот же. Он запускается каждые 57 секунд.
Я хотел бы знать, почему он запускается каждые 57 секунд, а не через указанный мной интервал, и как мне это исправить.
Код
Вот код, который я использую для создания моей базы данных, размещенной в каталоге. ~/airflow/dags
Задача просто выводит сообщение с меткой времени в файл журнала.
from datetime import timedelta
import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
# DEFAULT ARGUMENTS TO PASS TO EACH OPERATOR
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': days_ago(2),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'mydag2',
default_args=default_args,
description='A sample DAG',
schedule_interval="*/5 * * * *",
# schedule_interval=timedelta(minutes=5),
)
t1 = BashOperator(
task_id='print_message',
bash_command=f'echo "{datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")} tick" >> ~/logs/log.log',
dag=dag,
)
Результаты
Просмотр файла журнала показывает, что команда выполняется каждые 57 секунд.
2020-10-25 01:30:17 tick
2020-10-25 01:31:14 tick
2020-10-25 01:32:11 tick
2020-10-25 01:33:08 tick
2020-10-25 01:34:06 tick
2020-10-25 01:35:03 tick
2020-10-25 01:36:00 tick
2020-10-25 01:36:57 tick
2020-10-25 01:37:54 tick
2020-10-25 01:38:51 tick
Но, глядя на запуски базы данных в веб-приложении, в нем говорится, что они выполнялись каждые 5 минут, но за несколько дней до этого (хотя новые записи появляются каждые 57 секунд)
Контекст
Я использую следующую версию:
- Воздушный поток 1.10.12
- Ubuntu 20.04
- python 3.8.5
Комментарии:
1. Вы пробовали играть с
'start_date': days_ago(2),
? Скорее всего, это не так, потому что это не объясняет 57-секундную вещь, но если в вашем кластере включена функция «догнать по умолчанию», airflow попытается выполнить обратное заполнение за последние два дня. towardsdatascience.com / … объясняет кое-что из этого
Ответ №1:
Это из—за свойства catchup_by_default DAG. По умолчанию при создании он запускает пакеты сначала как пакет, start_date schedule interval
а затем выполняет итерацию для пакета на основе schedule interval
. Поскольку это наверстывание упущенного, он попытается завершить все запуски как можно скорее ( max_active_runs
ограничивая только количество пакетов, выполняемых в любой момент времени).
В вашем DAG начальная дата такова days_ago(2)
, что она пытается догнать все пакеты между датой начала и сейчас. Вот почему вы видите запись для каждой даты выполнения.
Если вы не хотите, чтобы он выполнялся для прошлых пакетов, вы можете установить catchup
как False