#airflow
#воздушный поток
Вопрос:
У меня есть DAG, копирующий данные в S3 с помощью PySpark, как показано ниже:
...
bucket = 'my.bucket'
schema = 'my_schema'
table = 'my_table'
ymd = pendulum.parse('{{ execution_date }}').strftime('%Y%m%d')
spark_script = 'my_spark_script'
DEFAULT_ARGS = {
'owner': 'burgerphilia',
'start_date': '2020-09-01',
'on_failure_callback': alert.slack_fail_alert,
'depends_on_past': False
}
SPARK_STEPS = [
{
'Name': f'{schema}_{table}_step',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': [
'sudo',
'spark-submit',
...
f's3://{bucket}/spark-script/{spark_script}.py',
'--ymd',
f'{ymd}'
]
}
}
]
def delete_s3_object(bucket, schema, table, ymd):
"""
:param bucket: bucket name
:type buket: str
:param schema: schema name(the same as hive schema)
:type schema: str
:param table: table name(the same as hive table)
:type table: str
:param ymd: date to delete, '%Y%m%d' format
:type ymd: str
"""
aws_hook = AwsHook(aws_conn_id='aws_conn')
session = aws_hook.get_session(region_name='ap-northeast-2')
s3 = session.resource('s3')
bucket = s3.Bucket(bucket)
bucket.objects.filter(Prefix=f'{schema}/{table}/ymd={ymd}/').delete()
with DAG(
dag_id=f'{schema}_{table}',
default_args=DEFAULT_ARGS,
catchup=False,
schedule_interval="40 06 * * *"
) as dag:
object_cleaner = PythonOperator(
task_id = 'delete_object',
python_callable=delete_s3_object,
op_kwargs={'bucket': bucket, 'schema': schema, 'table': table, ymd': ymd}
)
step_adder = EmrAddStepsOperator(
task_id='add_step',
job_flow_id=job_flow_id,
aws_conn_id='aws_conn',
steps=SPARK_STEPS,
)
step_checker = EmrStepSensor(
task_id='watch_step',
job_flow_id=job_flow_id,
step_id="{{ task_instance.xcom_pull(task_ids='add_step', key='return_value')[0] }}",
aws_conn_id='aws_conn',
)
object_cleaner >> step_adder >> step_checker
Этот DAG работает ежедневно, но дело в том, что источник данных (Oracle DB) иногда обновляется. Поэтому я должен повторно запускать одну и ту же базу данных каждый понедельник и первый день месяца, чтобы обновить предыдущую (например, 2020/11/02, повторно запустить 2020/10/26 ~ 2020/11/01). Есть ли лучший способ справиться с этим?
Ответ №1:
Прямого способа сделать это нет. Вы можете попробовать 2 вещи:
- используйте динамический dag (https://www.astronomer.io/guides/dynamically-generating-dags /) чтобы создать 2 базы данных с разными schedule_interval.
- Создайте другой dag, который будет запускать этот dag в другом scheduler_interval.