#airflow
#воздушный поток
Вопрос:
Я работаю над проблемой, из-за которой я не могу заставить DAG работать с правильным интервалом данных.
DAG имеют вид:
CYCLE_START_SECONDS = 240 CYCLE_END_SECONDS = 120 @dag(schedule_interval=timedelta(seconds=CYCLE_START_SECONDS - CYCLE_END_SECONDS), start_date=datetime(2021, 11, 16), catchup=True, default_args=DEFAULT_ARGS, on_failure_callback=emit_on_task_failure, max_active_runs=1, on_success_callback=emit_on_dag_success, render_template_as_native_obj=True) def ETL_Workflow(): """ Workflow to post process raw data into metrics for ingest into ES :return: """ @task() def get_start_date(start, end): print(start, end) end = int(end.timestamp()) if isinstance(start, pendulum.DateTime): start = int(start.timestamp()) else: start = end - CYCLE_START_SECONDS return start, end @task(execution_timeout=timedelta(seconds=CYCLE_START_SECONDS - CYCLE_END_SECONDS)) def run_query(start_end: tuple, query_template, conn_str, redis_key, transforms): start, end = start_end query = query_template.format(start=start, end=end) return run_pipeline(query, conn_str, redis_key, transforms) @task() def store_period_end(start_end: tuple): _ = Variable.set(DAG_NAME "_period_end", start_end[1]) return start = '{{ prev_data_interval_end_success }}' end = '{{ dag_run.execution_date }}' conn_str = get_source_url(SECRET) start_end = get_start_date(start, end) t1 = run_query(start_end, QUERY, conn_str, REDIS_KEY, TRANSFORMS) t3 = store_period_end(start_end) start_end gt;gt; t1 gt;gt; t3 dag = ETL_Workflow()
В частности, я получаю нужные интервалы данных, используя эти шаблоны:
start = '{{ prev_data_interval_end_success }}' end = '{{ dag_run.execution_date }}'
Но затем по какой-то причине эти значения разрешаются на одну и ту же дату и время
[2021-12-04, 18:42:20 UTC] {logging_mixin.py:109} INFO - start: 2021-12-04T18:40:18.451905 00:00 end: 2021-12-04 18:40:18.451905 00:00
Однако вы можете видеть, что интервалы данных в метаданных запуска указаны правильно:
Я в тупике. Датой выполнения DAG должна быть CYCLE_START_SECONDS после окончания интервала данных предыдущего запуска. Я провел модульное тестирование логики в get_start_date, и все в порядке. Более того, некоторые рабочие процессы не испытывают этой проблемы. Для этих рабочих процессов дата-время выполнения корректно рассчитывается как CYCLE_START_SECONDS после окончания предыдущего интервала данных. Я неправильно использую шаблоны? Я неправильно указываю расписание? Любые указания на то, в чем может быть проблема, будут оценены по достоинству. Спасибо.
Ответ №1:
Я думаю, что вы неправильно execution_date
понимаете (что нормально, потому что это чертовски запутанная концепция). Запуск DAG происходит execution_date
НЕ тогда, когда происходит запуск, а когда начали поступать данные, которые он должен обработать. Для запланированной на интервал DAG execution_date
почти всегда равно data_interval_start
, что , в свою очередь, почти всегда равно его предыдущему запуску DAG data_interval_end
. Это означает , что execution_date
это предыдущий запуск data_interval_end
, а не интервал после. Поэтому, если предыдущий запуск пройдет успешно, вы увидите prev_data_interval_end_success
, что равно execution_date
. Это совершенно нормально.
Учитывая, что вы знаете о существовании prev_data_interval_end_success, вы, вероятно, также знаете, что execution_date
это устаревшее, и именно потому, что концепция слишком запутанна. Не используйте его при написании новых DAG; вы, вероятно, ищете data_interval_end
вместо этого.
Комментарии:
1. Спасибо. Переключение на data_interval_end устранило проблему, хотя мне пришлось добавить смещение, чтобы избежать запроса данных, которые не были заполнены из вышестоящих трубопроводов.