Планирование баз данных Airflow для запуска исключительно с понедельника по пятницу, то есть только в будние дни

#python #cron #airflow #airflow-scheduler #pendulum

#python #cron #поток #airflow-планировщик #маятник

Вопрос:

У меня есть база данных, выполняющая скрипт Python, который принимает аргумент date (текущая дата). Я планирую запуск базы данных в 6: 00 утра с понедельника по пятницу, то есть в будние дни по восточному стандартному времени. Группа баз данных должна запускать скрипт Python в понедельник с датой понедельника в качестве аргумента, то же самое для вторника вплоть до пятницы с датой пятницы в качестве аргумента.

Я заметил, что использование интервала расписания '0 6 * * 1-5' не сработало, потому что выполнение по пятницам не выполнялось до следующего понедельника.

Я изменил интервал расписания на '0 6 * * *' , чтобы запускать каждый день в 6: 00 утра, и в начале моей базы данных фильтровать даты, которые попадают в нее ‘0 6 * * 1-5’ , так что эффективно с понедельника по пятницу. Для субботы и воскресенья последующие задачи следует пропустить.

Это мой код

 from __future__ import print_function
import pendulum
import logging
from airflow.models import DAG
from airflow.models import Variable
from datetime import datetime, timedelta
from airflow.contrib.operators.ssh_operator import SSHOperator
from airflow.operators.python_operator import ShortCircuitOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.trigger_rule import TriggerRule
from croniter import croniter


log = logging.getLogger(__name__)

def filter_processing_date(**context):
    execution_date = context['execution_date']
    cron = croniter('0 6 * * 1-5', execution_date)
    log.info('cron is: {}'.format(cron))
    log.info('execution date is: {}'.format(execution_date))
    #prev_date = cron.get_prev(datetime)
    #log.info('prev_date is: {}'.format(prev_date))
    return execution_date == cron.get_next(datetime).get_prev(datetime)


local_tz = pendulum.timezone("America/New_York")
# DAG parameters

default_args = {
    'owner': 'Managed Services',
    'depends_on_past': False,
    'start_date': datetime(2020, 8, 3, tzinfo=local_tz),
    'dagrun_timeout': None,
    'email': Variable.get('email'),
    'email_on_failure': True,
    'email_on_retry': False,
    'provide_context': True,
    'retries': 12,
    'retry_delay': timedelta(minutes=5)
}

with DAG(
    'execute_python',
    schedule_interval='0 6 * * *',
    default_args=default_args
    ) as dag:

    start_dummy = DummyOperator(
        task_id='start',
        dag=dag
    )

    end_dummy = DummyOperator(
        task_id='end',
        trigger_rule=TriggerRule.NONE_FAILED,
        dag=dag
    )

    weekdays_only = ShortCircuitOperator(
        task_id='weekdays_only',
        python_callable=filter_processing_date,
        dag=dag
    )


    run_python = SSHOperator(
    ssh_conn_id="oci_connection",
    task_id='run_python',
    command='/usr/bin/python3  /home/sb/local/bin/runProcess.py -d {{ ds_nodash }}',
    dag=dag)


    start_dummy >> weekdays_only >> run_python >> end_dummy
  

К сожалению, задача weekdays_only завершается ошибкой с приведенным ниже сообщением об ошибке. Что происходит не так?

Сообщение об ошибке Airflow

Продолжение сообщения об ошибке Airflow

Версия Airflow: v1.10.9-composer

Python 3.

Ответ №1:

Мне удалось решить свою проблему, взломав кое-что вместе. Проверяет, является ли следующая дата выполнения будним днем, и возвращает true, если это так, или false в противном случае. Я вызываю функцию в ShortCircuitOperator, которая выполняет последующие задачи, если true, или пропускает их, если false.

Это мой код ниже, но я открыт для лучших решений.

 from __future__ import print_function
import pendulum
import logging
from airflow.models import DAG
from airflow.models import Variable
from datetime import datetime, timedelta
from airflow.contrib.operators.ssh_operator import SSHOperator
from airflow.operators.python_operator import ShortCircuitOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.trigger_rule import TriggerRule


log = logging.getLogger(__name__)


def checktheday(**context):
    next_execution_date = context['next_execution_date']
    log.info('next_execution_date is: {}'.format(next_execution_date))
    date_check = next_execution_date.weekday()
    log.info('date_check is: {}'.format(date_check))
    if date_check == 0 or date_check == 1 or date_check == 2 or date_check == 3 or date_check == 4:
        decision = True
    else:
        decision = False

    log.info('decision is: {}'.format(decision))
    return decision


local_tz = pendulum.timezone("America/New_York")
# DAG parameters

default_args = {
    'owner': 'Managed Services',
    'depends_on_past': False,
    'start_date': datetime(2020, 8, 3, tzinfo=local_tz),
    'dagrun_timeout': None,
    'email': Variable.get('email'),
    'email_on_failure': True,
    'email_on_retry': False,
    'provide_context': True,
    'retries': 12,
    'retry_delay': timedelta(minutes=5)
}

with DAG(
    'execute_python',
    schedule_interval='0 6 * * *',
    default_args=default_args
    ) as dag:

    start_dummy = DummyOperator(
        task_id='start',
        dag=dag
    )

    end_dummy = DummyOperator(
        task_id='end',
        trigger_rule=TriggerRule.NONE_FAILED,
        dag=dag
    )

    weekdays_only = ShortCircuitOperator(
        task_id='weekdays_only',
        python_callable=checktheday,
        dag=dag
    )


    run_python = SSHOperator(
    ssh_conn_id="oci_connection",
    task_id='run_python',
    command='/usr/bin/python3  /home/sb/local/bin/runProcess.py -d {{ macros.ds_format(macros.ds_add(ds, 1), "%Y-%m-%d", "%Y%m%d") }}',
    dag=dag)


    start_dummy >> weekdays_only >> run_python >> end_dummy