#airflow
#воздушный поток
Вопрос:
Я хочу настроить задачу так, чтобы она зависела от дня недели в файле dag. Похоже, что макросы воздушного потока, подобные {{ next_execution_date }}
, недоступны напрямую в файле dag python. Это мое определение базы данных:
RG_TASKS = {
'asia': {
'start_date': pendulum.datetime.(2021,1,1,16,0,tzinfo='Asia/Tokyo'),
'tz': 'Asia/Tokyo',
'files': [
'/path/%Y%m%d/asia_file1.%Y%m%d.csv',
'/path/%Y%m%d/asia_file2.%Y%m%d.csv',
...], },
'euro': {
'start_date': pendulum.datetime.(2021,1,1,16,0,tzinfo='Europe/London'),
'tz': 'Europe/London',
'files': [
'/path/%Y%m%d/euro_file1.%Y%m%d.csv',
'/path/%Y%m%d/euro_file2.%Y%m%d.csv',
...], },
}
dag = DAG(..., start_date=pendulum.datetime.(2021,1,1,16,0,tzinfo='Asia/Tokyo'),
schedule='00 16 * * 0-6')
for rg, t in RG_TASKS.items():
tz = t['tz']
h = t['start_date'].hour
m = t['start_date'].minute
target_time = f'{{{{ next_execution_date.replace(tzinfo="{tz}", hour={h}, minute={m}) }}}}'
time_sensor = DateTimeSensor(dag=dag, task_id=f'wait_for_{rg}', tartget_time=target_time)
bash_task = BashOperator(dag=dag, task_id='load_{rg}', trigger_rule='all_success', depends_on_past=True, bash_command=...)
for fname in t['files']:
fpath = f'{{{{ next_execution_date.strftime("{fname}") }}}}'
task_id = os.path.basename(fname).split('.')[0]
file_sensor = FileSensor(dag=dag, task_id=task_id, filepath=fpath, ...)
file_sensor.set_upstream(time_sensor)
file_sensor.set_downstream(bash_task)
Вышеуказанное работает, и bash_task будет запущен, если все файлы доступны, и для него установлено значение depend_on_past=True. Однако у файлов немного другое расписание. {rg}_file1
будет доступно 6 дней в неделю, кроме субботы, в то время как остальные доступны 7 дней в неделю.
Один из вариантов — создать 2 базы данных, одна из которых запланирована для запуска Вс-Пт, а другая запланирована для запуска только в субботу. Но при использовании этой опции значение depends_on_past=True не выполняется в субботу.
Есть ли лучший способ сохранить depends_on_past=True 7 дней в неделю? В идеале в цикле файлов я мог бы сделать что-то вроде:
for fname in t['files']:
dt = ...
if dt.weekday()==5 and task_id==f'{rg}_file1':
continue
Ответ №1:
Обычно я думаю, что лучше выполнять что-то в одной задаче, когда это достаточно легко сделать, и в этом случае мне кажется, что вы можете.
Я не совсем уверен, почему вы используете датчик даты и времени, но это не кажется необходимым. Насколько я могу судить, вы просто хотите, чтобы ваш процесс запускался каждый день (в идеале после того, как файл будет там) и пропускался один раз в неделю.
Я думаю, мы тоже можем отказаться от файлового датчика.
Вариант 1: все в bash
Проверьте наличие в вашем скрипте bash и завершите работу с ошибкой (с повторными попытками), если отсутствует. Просто верните ненулевой код выхода, если файл отсутствует.
Тогда в вашем скрипте bash вы могли бы ничего не делать в день пропуска.
В дни пропуска ваша задача bash будет зеленой, даже если она ничего не сделала.
Вариант 2: оператор bash подкласса
Подкласс BashOperator и добавьте skip_day
параметр. Тогда ваше выполнение выглядит следующим образом:
def execute(self, context):
next_execution_date = context['next_execution_date']
if next_execution_date.day_of_week == self.skip_day:
raise AirflowSkipException(f'we skip on day {self.skip_day}')
super().execute(context)
При использовании этой опции ваш сценарий bash по-прежнему должен завершаться сбоем, если файл отсутствует, но ему не нужно иметь дело с логикой пропуска. И вы сможете увидеть, что задача пропущена в пользовательском интерфейсе.
В любом случае, никаких датчиков.
Другое примечание
Вы можете упростить создание шаблонов имен файлов.
files=[
'/path/{{ next_ds_nodash }}/euro_file2.{{ next_ds_nodash }}.csv',
...
]
Тогда вам не нужно возиться strftime
.
Комментарии:
1. Фактический путь / файл более сложный, чем %Y%m%d, поэтому strftime . Добавлен DateTimeSensor, поскольку у меня создается впечатление, что FileSensor может быть недостаточно эффективным. Мы не хотим, чтобы FileSensor постоянно проверял десятки файлов раньше, если мы знаем, что они будут доступны позже в тот же день в зависимости от часового пояса местного региона.
2. Файлы получены от поставщика и поставляются несколькими пакетами по регионам. В большинстве случаев они прибывают вовремя. Но иногда они могут опаздывать, и нам нужно сообщить, какой файл задерживается. Сценарий загрузки данных на python (который вызывает скрипт bash) знает о доступности файлов в какой день, и он обработает его должным образом, если все файлы в этот день доступны.
3. по сути, ответ, который я даю, «использовать
AirflowSkipException
«. это аналог воздушногоcontinue
потока в вашем примере цикла.