#airflow
#воздушный поток
Вопрос:
Airflow упрощает выполнение заданий с фиксированными интервалами. В этом сообщении содержится совет о том, как справиться с более сложными требованиями к планированию.
Например, предположим, что у меня есть процесс, который извлекает файлы с SFTP-сервера и что-то с ними делает. Источник публикует только файлы M-F. Я хочу, чтобы dag вел себя следующим образом:
- запускайте только M-F;
- по понедельникам ищите файл из
execution_date - 0
и- 1
и- 2
- Вт-Пт, просто посмотрите
execution date - 0
?
Кажется, это непрактично реализовать, и то, что мне нужно сделать, это просто спроектировать его так, чтобы извлекать любые файлы, которые там есть, и запускать каждый день, без ссылки на конкретные файлы.
Дело в том, что если я могу указать файлы, управляемые execution_date
тогда, я могу точно видеть, что было извлечено, а что нет, и использовать функцию повтора.
Один из способов, который приходит на ум, — создать 7 баз данных, каждая с еженедельным расписанием. Но мне не нравится эта идея.
Другим случаем было бы, если бы я хотел, чтобы процесс запускался каждое второе воскресенье месяца. Есть ли какой-нибудь способ сделать что-то подобное?
Редактировать: Я думаю, что самым простым способом добиться этого будет разработать dag, чтобы всегда извлекать файлы с датой execution_date
, но просто не запускать запуски sat и sun до понедельника (и использовать для этого trigger dag operator), а также использовать dag контроллера с BranchOperator и TriggerDagOperator для достижения этой цели.
Ответ №1:
Установите DAG 'schedule_interval':
так, чтобы '0 0 * * 1-5'
они запускались в 00:00 каждый день недели с понедельника по пятницу. Отрегулируйте время по мере необходимости (первые два нуля).
Затем используйте BranchPythonOperator
как способ ввода для DAG. Таким образом, по понедельникам выполняется DAG и ищет файл с execution_date - 0
помощью , execution_date - 1
, и execution_date - 2
. Со вторника по пятницу он просто ищет execution_date - 0
.
Я создал краткий пример, чтобы показать вам, что я имею в виду. Я надеюсь, что это достаточный пример. Дайте мне знать, если я смогу помочь дальше.
#Airflow imports
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import BranchPythonOperator
# General imports
from datetime import datetime
DAG_ID = 'stackoverflow_exampledag'
args = {
'owner': 'you',
'email': ['you@yourwork.com'],
'depends_on_past': False,
'email_on_retry': False,
'email_on_failure': True,
'start_date': datetime(2019, 4, 14)
}
dag = DAG(
dag_id=DAG_ID,
default_args=args,
schedule_interval="0 0 * * 1-5"
)
#################################
######## Python Script ##########
#################################
def checktheday(**kwargs):
weekday = datetime.today().weekday()
if weekday == 1:
return 'monday_only_task'
else:
return 'tuesday_through_friday_task'
####################################
########## TASKS ###################
####################################
# BranchPythonOperator is the entry point for this DAG.
# The python callable will return the task id of the appriorate subdag/task that it's supposed to run.
checktheday_task = BranchPythonOperator(
task_id='checktheday_task',
python_callable=checktheday,
dag=dag,
provide_context=True
)
monday_only_task = DummyOperator(
task_id='monday_only_task',
dag=dag
)
tuesday_through_friday_task = DummyOperator(
task_id='tuesday_through_friday_task',
dag=dag
#################################
########## ORCHESTRATION ########
#################################
monday_only_task.set_upstream(checktheday_task)
tuesday_through_friday_task.set_upstream(checktheday_task)
Комментарии:
1. В настоящее время я думаю, что самый простой способ реализовать это — использовать оператор trigger dag, чтобы dag всегда искал файл с датой
execution_date
, а управляющий dag просто не планировал запуск sat и sun до понедельника2. @chorbs Я не уверен, что понимаю. Зачем использовать TriggerDagOperator? BranchPythonOperator выполнит все, что, я думаю, вы пытаетесь сделать. Вы можете просто добавить monday_only_task для обработки субботних / воскресных запусков.
3. Проблема в том, что каждый день я извлекаю 4 типа файлов на эту дату, и я хотел бы, чтобы каждый из них был отдельным экземпляром задачи. Следовательно, при таком подходе в моем dag будет 12 ti в каждом запуске dag, и 8 из них будут пропускаться ежедневно, и все они будут пропускаться по выходным. Это просто беспорядочно, когда думаешь о древовидном представлении. При использовании подхода trigger dag древовидное представление является чистым и аккуратно отображает историю, не искаженную требуемым индивидуальным планированием.
Ответ №2:
Ответ Зака был полезен в решении этой проблемы (потому что был необходим оператор перехода), но решение, которое я собираюсь использовать, — использовать TriggerDagRunOperator
.
Вот базы данных, которые я создал, чтобы протестировать этот подход.
Целевая база данных
def alert(ti, **kwargs):
message = f"Execution date is {ti.execution_date}"
print(message)
with target_dag:
PythonOperator(
python_callable=alert,
task_id='target_task',
provide_context=True,
)
Вызвать dag
def check_day(ti, **kwargs):
execution_date = ti.execution_date
if execution_date.minute % 7 == 0:
return ['weekday_trigger', 'saturday_trigger', 'sunday_trigger']
elif execution_date.minute % 7 in range(1, 5):
return ['weekday_trigger']
else:
return []
with trigger_dag:
check_day_task = BranchPythonOperator(
task_id='check_day_task',
python_callable=check_day,
provide_context=True,
)
weekday_trigger = TriggerDagRunOperator(
task_id='weekday_trigger',
trigger_dag_id='target_dag',
execution_date='{{ execution_date }}'
)
saturday_trigger = TriggerDagRunOperator(
task_id='saturday_trigger',
trigger_dag_id='target_dag',
execution_date='{{ execution_date macros.timedelta(days=-1) }}'
)
sunday_trigger = TriggerDagRunOperator(
task_id='sunday_trigger',
trigger_dag_id='target_dag',
execution_date='{{ execution_date macros.timedelta(days=-2) }}'
)
check_day_task >> [weekday_trigger, saturday_trigger, sunday_trigger]
Почему бы просто не использовать branch operator?
Причина, по которой я предпочитаю этот подход, заключается в том, что моему целевому dag не нужно заботиться о сложном планировании. Все, о чем нужно заботиться, это дата выполнения. Так получилось, что по понедельникам мы хотим выполнить execution_date - 1
и execution_date - 2
в дополнение к execution_date
. Но целевой dag работает одинаково, несмотря ни на что: он выполняет определенную вещь на основе execution_date
.
Если я попытаюсь включить оператор перехода в целевую базу данных, это очень быстро приведет к беспорядку. Например, если в вашей целевой базе данных есть 4 задачи, вам нужно дублировать их 2 раза по понедельникам. Кроме того, древовидное представление истории выполнения dag было бы уродливым, с большим количеством пропущенных задач, и обратная засыпка, вероятно, была бы странной.
Завершение
Итак, в будние дни, отличные от понедельника, наш триггер dag запускается target_dag
с той же датой выполнения, что и триггер dag. В выходные триггер dag ничего не запускает. И по понедельникам он выполняет 3 прогона target_dag
, для понедельника и предыдущих двух дней.
Примечание: я использовал минуты для имитации дней при тестировании запланированных запусков.
Вот вид графика из trigger_dag dag:
Древовидное представление основной задачи остается чистым и простым:
Комментарии:
1. Это был очень хороший итоговый пост. У меня возникли некоторые проблемы с пониманием вашего ответа на мой комментарий выше, но теперь я могу понять вашу логику. Спасибо!