как добиться более сложного планирования dag с помощью airflow?

#airflow

#воздушный поток

Вопрос:

Airflow упрощает выполнение заданий с фиксированными интервалами. В этом сообщении содержится совет о том, как справиться с более сложными требованиями к планированию.

Например, предположим, что у меня есть процесс, который извлекает файлы с SFTP-сервера и что-то с ними делает. Источник публикует только файлы M-F. Я хочу, чтобы dag вел себя следующим образом:

  • запускайте только M-F;
  • по понедельникам ищите файл из execution_date - 0 и - 1 и - 2
  • Вт-Пт, просто посмотрите execution date - 0 ?

Кажется, это непрактично реализовать, и то, что мне нужно сделать, это просто спроектировать его так, чтобы извлекать любые файлы, которые там есть, и запускать каждый день, без ссылки на конкретные файлы.

Дело в том, что если я могу указать файлы, управляемые execution_date тогда, я могу точно видеть, что было извлечено, а что нет, и использовать функцию повтора.

Один из способов, который приходит на ум, — создать 7 баз данных, каждая с еженедельным расписанием. Но мне не нравится эта идея.

Другим случаем было бы, если бы я хотел, чтобы процесс запускался каждое второе воскресенье месяца. Есть ли какой-нибудь способ сделать что-то подобное?

Редактировать: Я думаю, что самым простым способом добиться этого будет разработать dag, чтобы всегда извлекать файлы с датой execution_date , но просто не запускать запуски sat и sun до понедельника (и использовать для этого trigger dag operator), а также использовать dag контроллера с BranchOperator и TriggerDagOperator для достижения этой цели.

Ответ №1:

Установите DAG 'schedule_interval': так, чтобы '0 0 * * 1-5' они запускались в 00:00 каждый день недели с понедельника по пятницу. Отрегулируйте время по мере необходимости (первые два нуля).

Затем используйте BranchPythonOperator как способ ввода для DAG. Таким образом, по понедельникам выполняется DAG и ищет файл с execution_date - 0 помощью , execution_date - 1 , и execution_date - 2 . Со вторника по пятницу он просто ищет execution_date - 0 .

Я создал краткий пример, чтобы показать вам, что я имею в виду. Я надеюсь, что это достаточный пример. Дайте мне знать, если я смогу помочь дальше.

 #Airflow imports
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import BranchPythonOperator
# General imports
from datetime import datetime

DAG_ID = 'stackoverflow_exampledag'

args = {
    'owner': 'you',
    'email': ['you@yourwork.com'],
    'depends_on_past': False,
    'email_on_retry': False,
    'email_on_failure': True,
    'start_date': datetime(2019, 4, 14)
}

dag = DAG(
    dag_id=DAG_ID,
    default_args=args,
    schedule_interval="0 0 * * 1-5"
    )


#################################
######## Python Script ##########
#################################


def checktheday(**kwargs):
    weekday = datetime.today().weekday()
    if weekday == 1:
        return 'monday_only_task'
    else:
        return 'tuesday_through_friday_task'


####################################
########## TASKS ###################
####################################

# BranchPythonOperator is the entry point for this DAG.
# The python callable will return the task id of the appriorate subdag/task that it's supposed to run.

checktheday_task = BranchPythonOperator(
    task_id='checktheday_task',
    python_callable=checktheday,
    dag=dag,
    provide_context=True
    )

monday_only_task = DummyOperator(
    task_id='monday_only_task',
    dag=dag
    )

tuesday_through_friday_task = DummyOperator(
    task_id='tuesday_through_friday_task',
    dag=dag


#################################
########## ORCHESTRATION ########
#################################
monday_only_task.set_upstream(checktheday_task)
tuesday_through_friday_task.set_upstream(checktheday_task)
  

примерное представление графика

Комментарии:

1. В настоящее время я думаю, что самый простой способ реализовать это — использовать оператор trigger dag, чтобы dag всегда искал файл с датой execution_date , а управляющий dag просто не планировал запуск sat и sun до понедельника

2. @chorbs Я не уверен, что понимаю. Зачем использовать TriggerDagOperator? BranchPythonOperator выполнит все, что, я думаю, вы пытаетесь сделать. Вы можете просто добавить monday_only_task для обработки субботних / воскресных запусков.

3. Проблема в том, что каждый день я извлекаю 4 типа файлов на эту дату, и я хотел бы, чтобы каждый из них был отдельным экземпляром задачи. Следовательно, при таком подходе в моем dag будет 12 ti в каждом запуске dag, и 8 из них будут пропускаться ежедневно, и все они будут пропускаться по выходным. Это просто беспорядочно, когда думаешь о древовидном представлении. При использовании подхода trigger dag древовидное представление является чистым и аккуратно отображает историю, не искаженную требуемым индивидуальным планированием.

Ответ №2:

Ответ Зака был полезен в решении этой проблемы (потому что был необходим оператор перехода), но решение, которое я собираюсь использовать, — использовать TriggerDagRunOperator .

Вот базы данных, которые я создал, чтобы протестировать этот подход.

Целевая база данных

 def alert(ti, **kwargs):
    message = f"Execution date is {ti.execution_date}"
    print(message)

with target_dag:
    PythonOperator(
        python_callable=alert,
        task_id='target_task',
        provide_context=True,
    )
  

Вызвать dag

 def check_day(ti, **kwargs):
    execution_date = ti.execution_date
    if execution_date.minute % 7 == 0:
        return ['weekday_trigger', 'saturday_trigger', 'sunday_trigger']
    elif execution_date.minute % 7 in range(1, 5):
        return ['weekday_trigger']
    else:
        return []

with trigger_dag:
    check_day_task = BranchPythonOperator(
        task_id='check_day_task',
        python_callable=check_day,
        provide_context=True,
    )

    weekday_trigger = TriggerDagRunOperator(
        task_id='weekday_trigger',
        trigger_dag_id='target_dag',
        execution_date='{{ execution_date }}'
    )
    saturday_trigger = TriggerDagRunOperator(
        task_id='saturday_trigger',
        trigger_dag_id='target_dag',
        execution_date='{{ execution_date   macros.timedelta(days=-1) }}'
    )
    sunday_trigger = TriggerDagRunOperator(
        task_id='sunday_trigger',
        trigger_dag_id='target_dag',
        execution_date='{{ execution_date   macros.timedelta(days=-2) }}'
    )

    check_day_task >> [weekday_trigger, saturday_trigger, sunday_trigger]
  

Почему бы просто не использовать branch operator?

Причина, по которой я предпочитаю этот подход, заключается в том, что моему целевому dag не нужно заботиться о сложном планировании. Все, о чем нужно заботиться, это дата выполнения. Так получилось, что по понедельникам мы хотим выполнить execution_date - 1 и execution_date - 2 в дополнение к execution_date . Но целевой dag работает одинаково, несмотря ни на что: он выполняет определенную вещь на основе execution_date .

Если я попытаюсь включить оператор перехода в целевую базу данных, это очень быстро приведет к беспорядку. Например, если в вашей целевой базе данных есть 4 задачи, вам нужно дублировать их 2 раза по понедельникам. Кроме того, древовидное представление истории выполнения dag было бы уродливым, с большим количеством пропущенных задач, и обратная засыпка, вероятно, была бы странной.

Завершение

Итак, в будние дни, отличные от понедельника, наш триггер dag запускается target_dag с той же датой выполнения, что и триггер dag. В выходные триггер dag ничего не запускает. И по понедельникам он выполняет 3 прогона target_dag , для понедельника и предыдущих двух дней.

Примечание: я использовал минуты для имитации дней при тестировании запланированных запусков.

Вот вид графика из trigger_dag dag: графическое представление триггерной dag

Древовидное представление основной задачи остается чистым и простым: древовидное представление целевой базы данных

Комментарии:

1. Это был очень хороший итоговый пост. У меня возникли некоторые проблемы с пониманием вашего ответа на мой комментарий выше, но теперь я могу понять вашу логику. Спасибо!