Настройка задачи воздушного потока с датой выполнения

#airflow

#воздушный поток

Вопрос:

Я хочу настроить задачу так, чтобы она зависела от дня недели в файле dag. Похоже, что макросы воздушного потока, подобные {{ next_execution_date }} , недоступны напрямую в файле dag python. Это мое определение базы данных:

 RG_TASKS = {
    'asia': {
        'start_date': pendulum.datetime.(2021,1,1,16,0,tzinfo='Asia/Tokyo'),
        'tz': 'Asia/Tokyo',
        'files': [
            '/path/%Y%m%d/asia_file1.%Y%m%d.csv',
            '/path/%Y%m%d/asia_file2.%Y%m%d.csv',
            ...], },
    'euro': {
        'start_date': pendulum.datetime.(2021,1,1,16,0,tzinfo='Europe/London'),
        'tz': 'Europe/London',
        'files': [
            '/path/%Y%m%d/euro_file1.%Y%m%d.csv',
            '/path/%Y%m%d/euro_file2.%Y%m%d.csv',
            ...], },
}

dag = DAG(..., start_date=pendulum.datetime.(2021,1,1,16,0,tzinfo='Asia/Tokyo'),
    schedule='00 16 * * 0-6')

for rg, t in RG_TASKS.items():
    tz = t['tz']
    h = t['start_date'].hour
    m = t['start_date'].minute
    target_time = f'{{{{ next_execution_date.replace(tzinfo="{tz}", hour={h}, minute={m}) }}}}'
    time_sensor = DateTimeSensor(dag=dag, task_id=f'wait_for_{rg}', tartget_time=target_time)
    bash_task = BashOperator(dag=dag, task_id='load_{rg}', trigger_rule='all_success', depends_on_past=True, bash_command=...)
    for fname in t['files']:
        fpath = f'{{{{ next_execution_date.strftime("{fname}") }}}}'
        task_id = os.path.basename(fname).split('.')[0]
        file_sensor = FileSensor(dag=dag, task_id=task_id, filepath=fpath, ...)
        file_sensor.set_upstream(time_sensor)
        file_sensor.set_downstream(bash_task)
 

Вышеуказанное работает, и bash_task будет запущен, если все файлы доступны, и для него установлено значение depend_on_past=True. Однако у файлов немного другое расписание. {rg}_file1 будет доступно 6 дней в неделю, кроме субботы, в то время как остальные доступны 7 дней в неделю.

Один из вариантов — создать 2 базы данных, одна из которых запланирована для запуска Вс-Пт, а другая запланирована для запуска только в субботу. Но при использовании этой опции значение depends_on_past=True не выполняется в субботу.

Есть ли лучший способ сохранить depends_on_past=True 7 дней в неделю? В идеале в цикле файлов я мог бы сделать что-то вроде:

     for fname in t['files']:
        dt = ...
        if dt.weekday()==5 and task_id==f'{rg}_file1':
            continue
 

Ответ №1:

Обычно я думаю, что лучше выполнять что-то в одной задаче, когда это достаточно легко сделать, и в этом случае мне кажется, что вы можете.

Я не совсем уверен, почему вы используете датчик даты и времени, но это не кажется необходимым. Насколько я могу судить, вы просто хотите, чтобы ваш процесс запускался каждый день (в идеале после того, как файл будет там) и пропускался один раз в неделю.

Я думаю, мы тоже можем отказаться от файлового датчика.

Вариант 1: все в bash

Проверьте наличие в вашем скрипте bash и завершите работу с ошибкой (с повторными попытками), если отсутствует. Просто верните ненулевой код выхода, если файл отсутствует.

Тогда в вашем скрипте bash вы могли бы ничего не делать в день пропуска.

В дни пропуска ваша задача bash будет зеленой, даже если она ничего не сделала.

Вариант 2: оператор bash подкласса

Подкласс BashOperator и добавьте skip_day параметр. Тогда ваше выполнение выглядит следующим образом:

 def execute(self, context):
    next_execution_date = context['next_execution_date']
    if next_execution_date.day_of_week == self.skip_day:
        raise AirflowSkipException(f'we skip on day {self.skip_day}')
    super().execute(context)
 

При использовании этой опции ваш сценарий bash по-прежнему должен завершаться сбоем, если файл отсутствует, но ему не нужно иметь дело с логикой пропуска. И вы сможете увидеть, что задача пропущена в пользовательском интерфейсе.

В любом случае, никаких датчиков.

Другое примечание

Вы можете упростить создание шаблонов имен файлов.

 files=[
'/path/{{ next_ds_nodash }}/euro_file2.{{ next_ds_nodash }}.csv',
...
]
 

Тогда вам не нужно возиться strftime .

Комментарии:

1. Фактический путь / файл более сложный, чем %Y%m%d, поэтому strftime . Добавлен DateTimeSensor, поскольку у меня создается впечатление, что FileSensor может быть недостаточно эффективным. Мы не хотим, чтобы FileSensor постоянно проверял десятки файлов раньше, если мы знаем, что они будут доступны позже в тот же день в зависимости от часового пояса местного региона.

2. Файлы получены от поставщика и поставляются несколькими пакетами по регионам. В большинстве случаев они прибывают вовремя. Но иногда они могут опаздывать, и нам нужно сообщить, какой файл задерживается. Сценарий загрузки данных на python (который вызывает скрипт bash) знает о доступности файлов в какой день, и он обработает его должным образом, если все файлы в этот день доступны.

3. по сути, ответ, который я даю, «использовать AirflowSkipException «. это аналог воздушного continue потока в вашем примере цикла.