Поток воздуха выполняется каждые 57 секунд, независимо от интервала расписания

#python #airflow #airflow-scheduler

#python #воздушный поток #воздушный поток-планировщик

Вопрос:

Я создал простую базу данных Airflow, которую я запланировал запускать каждые 5 минут. Однако на самом деле он запускается каждые 57 секунд.

Я попытался настроить расписание, используя оба:

  • строка выражения расписания CRON
  • Объект datetime timedelta

И все равно получаем одинаковый результат для обеих версий.

Я также пробовал разные интервалы расписания, и результат всегда один и тот же. Он запускается каждые 57 секунд.

Я хотел бы знать, почему он запускается каждые 57 секунд, а не через указанный мной интервал, и как мне это исправить.

Код

Вот код, который я использую для создания моей базы данных, размещенной в каталоге. ~/airflow/dags Задача просто выводит сообщение с меткой времени в файл журнала.

 from datetime import timedelta
import datetime
from airflow import DAG

from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago

# DEFAULT ARGUMENTS TO PASS TO EACH OPERATOR
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': days_ago(2),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    }

dag = DAG(
    'mydag2',
    default_args=default_args,
    description='A sample DAG',
    schedule_interval="*/5 * * * *",
    # schedule_interval=timedelta(minutes=5),
    )

t1 = BashOperator(
    task_id='print_message',
    bash_command=f'echo "{datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")}  tick" >> ~/logs/log.log',
    dag=dag,
    )

  

Результаты

Просмотр файла журнала показывает, что команда выполняется каждые 57 секунд.

 2020-10-25 01:30:17  tick
2020-10-25 01:31:14  tick
2020-10-25 01:32:11  tick
2020-10-25 01:33:08  tick
2020-10-25 01:34:06  tick
2020-10-25 01:35:03  tick
2020-10-25 01:36:00  tick
2020-10-25 01:36:57  tick
2020-10-25 01:37:54  tick
2020-10-25 01:38:51  tick
  

Но, глядя на запуски базы данных в веб-приложении, в нем говорится, что они выполнялись каждые 5 минут, но за несколько дней до этого (хотя новые записи появляются каждые 57 секунд)

скриншот веб-приложения

Контекст

Я использую следующую версию:

  • Воздушный поток 1.10.12
  • Ubuntu 20.04
  • python 3.8.5

Комментарии:

1. Вы пробовали играть с 'start_date': days_ago(2), ? Скорее всего, это не так, потому что это не объясняет 57-секундную вещь, но если в вашем кластере включена функция «догнать по умолчанию», airflow попытается выполнить обратное заполнение за последние два дня. towardsdatascience.com / … объясняет кое-что из этого

Ответ №1:

Это изза свойства catchup_by_default DAG. По умолчанию при создании он запускает пакеты сначала как пакет, start_date schedule interval а затем выполняет итерацию для пакета на основе schedule interval . Поскольку это наверстывание упущенного, он попытается завершить все запуски как можно скорее ( max_active_runs ограничивая только количество пакетов, выполняемых в любой момент времени).

В вашем DAG начальная дата такова days_ago(2) , что она пытается догнать все пакеты между датой начала и сейчас. Вот почему вы видите запись для каждой даты выполнения.

Если вы не хотите, чтобы он выполнялся для прошлых пакетов, вы можете установить catchup как False