Как я могу периодически повторно запускать один и тот же DAG?

#airflow

#воздушный поток

Вопрос:

У меня есть DAG, копирующий данные в S3 с помощью PySpark, как показано ниже:

 ...
bucket = 'my.bucket'
schema = 'my_schema'
table = 'my_table'
ymd = pendulum.parse('{{ execution_date }}').strftime('%Y%m%d')
spark_script = 'my_spark_script'

DEFAULT_ARGS = {
    'owner': 'burgerphilia',
    'start_date': '2020-09-01',
    'on_failure_callback': alert.slack_fail_alert,
    'depends_on_past': False
}

SPARK_STEPS = [
    {
        'Name': f'{schema}_{table}_step',
        'ActionOnFailure': 'CONTINUE',
        'HadoopJarStep': {
            'Jar': 'command-runner.jar',
            'Args': [
                'sudo',
                'spark-submit',
                 ...
                f's3://{bucket}/spark-script/{spark_script}.py',
                '--ymd',
                f'{ymd}'
            ]
        }
    }
]

def delete_s3_object(bucket, schema, table, ymd):
    """
    :param bucket: bucket name
    :type buket: str
    :param schema: schema name(the same as hive schema)
    :type schema: str
    :param table: table name(the same as hive table)
    :type table: str
    :param ymd: date to delete, '%Y%m%d' format
    :type ymd: str
    """
   
    aws_hook = AwsHook(aws_conn_id='aws_conn')
    session = aws_hook.get_session(region_name='ap-northeast-2')
    s3 = session.resource('s3')
    bucket = s3.Bucket(bucket)
    bucket.objects.filter(Prefix=f'{schema}/{table}/ymd={ymd}/').delete()

with DAG(
    dag_id=f'{schema}_{table}',
    default_args=DEFAULT_ARGS,
    catchup=False,
    schedule_interval="40 06 * * *"
) as dag:
    object_cleaner = PythonOperator(
        task_id = 'delete_object', 
        python_callable=delete_s3_object, 
        op_kwargs={'bucket': bucket, 'schema': schema, 'table': table, ymd': ymd}
    )

    step_adder = EmrAddStepsOperator(
        task_id='add_step',
        job_flow_id=job_flow_id, 
        aws_conn_id='aws_conn', 
        steps=SPARK_STEPS,
    )

    step_checker = EmrStepSensor(
        task_id='watch_step', 
        job_flow_id=job_flow_id, 
        step_id="{{ task_instance.xcom_pull(task_ids='add_step', key='return_value')[0] }}",
        aws_conn_id='aws_conn',
    )

    object_cleaner >> step_adder >> step_checker
  

Этот DAG работает ежедневно, но дело в том, что источник данных (Oracle DB) иногда обновляется. Поэтому я должен повторно запускать одну и ту же базу данных каждый понедельник и первый день месяца, чтобы обновить предыдущую (например, 2020/11/02, повторно запустить 2020/10/26 ~ 2020/11/01). Есть ли лучший способ справиться с этим?

Ответ №1:

Прямого способа сделать это нет. Вы можете попробовать 2 вещи:

  1. используйте динамический dag (https://www.astronomer.io/guides/dynamically-generating-dags /) чтобы создать 2 базы данных с разными schedule_interval.
  2. Создайте другой dag, который будет запускать этот dag в другом scheduler_interval.