Ключи шаблона {{ prev_data_interval_end_success }} равны {{ dag_run.execution_date }} для некоторых запусков DAG воздушного потока

#airflow

#воздушный поток

Вопрос:

Я работаю над проблемой, из-за которой я не могу заставить DAG работать с правильным интервалом данных.

DAG имеют вид:

 CYCLE_START_SECONDS = 240 CYCLE_END_SECONDS = 120   @dag(schedule_interval=timedelta(seconds=CYCLE_START_SECONDS - CYCLE_END_SECONDS), start_date=datetime(2021, 11, 16),  catchup=True, default_args=DEFAULT_ARGS,  on_failure_callback=emit_on_task_failure, max_active_runs=1, on_success_callback=emit_on_dag_success,  render_template_as_native_obj=True) def ETL_Workflow(): """ Workflow to post process raw data into metrics for ingest into ES :return: """  @task() def get_start_date(start, end):  print(start, end)  end = int(end.timestamp())  if isinstance(start, pendulum.DateTime):  start = int(start.timestamp())  else:  start = end - CYCLE_START_SECONDS  return start, end  @task(execution_timeout=timedelta(seconds=CYCLE_START_SECONDS - CYCLE_END_SECONDS)) def run_query(start_end: tuple, query_template, conn_str, redis_key, transforms):  start, end = start_end  query = query_template.format(start=start, end=end)  return run_pipeline(query, conn_str, redis_key, transforms)  @task() def store_period_end(start_end: tuple):  _ = Variable.set(DAG_NAME   "_period_end", start_end[1])  return  start = '{{ prev_data_interval_end_success }}' end = '{{ dag_run.execution_date }}'  conn_str = get_source_url(SECRET) start_end = get_start_date(start, end) t1 = run_query(start_end, QUERY, conn_str, REDIS_KEY, TRANSFORMS) t3 = store_period_end(start_end) start_end gt;gt; t1 gt;gt; t3   dag = ETL_Workflow()  

В частности, я получаю нужные интервалы данных, используя эти шаблоны:

 start = '{{ prev_data_interval_end_success }}' end = '{{ dag_run.execution_date }}'  

Но затем по какой-то причине эти значения разрешаются на одну и ту же дату и время

 [2021-12-04, 18:42:20 UTC] {logging_mixin.py:109} INFO - start: 2021-12-04T18:40:18.451905 00:00 end: 2021-12-04 18:40:18.451905 00:00  

Однако вы можете видеть, что интервалы данных в метаданных запуска указаны правильно:

информация о запуске

Я в тупике. Датой выполнения DAG должна быть CYCLE_START_SECONDS после окончания интервала данных предыдущего запуска. Я провел модульное тестирование логики в get_start_date, и все в порядке. Более того, некоторые рабочие процессы не испытывают этой проблемы. Для этих рабочих процессов дата-время выполнения корректно рассчитывается как CYCLE_START_SECONDS после окончания предыдущего интервала данных. Я неправильно использую шаблоны? Я неправильно указываю расписание? Любые указания на то, в чем может быть проблема, будут оценены по достоинству. Спасибо.

Ответ №1:

Я думаю, что вы неправильно execution_date понимаете (что нормально, потому что это чертовски запутанная концепция). Запуск DAG происходит execution_date НЕ тогда, когда происходит запуск, а когда начали поступать данные, которые он должен обработать. Для запланированной на интервал DAG execution_date почти всегда равно data_interval_start , что , в свою очередь, почти всегда равно его предыдущему запуску DAG data_interval_end . Это означает , что execution_date это предыдущий запуск data_interval_end , а не интервал после. Поэтому, если предыдущий запуск пройдет успешно, вы увидите prev_data_interval_end_success , что равно execution_date . Это совершенно нормально.

Учитывая, что вы знаете о существовании prev_data_interval_end_success, вы, вероятно, также знаете, что execution_date это устаревшее, и именно потому, что концепция слишком запутанна. Не используйте его при написании новых DAG; вы, вероятно, ищете data_interval_end вместо этого.

Комментарии:

1. Спасибо. Переключение на data_interval_end устранило проблему, хотя мне пришлось добавить смещение, чтобы избежать запроса данных, которые не были заполнены из вышестоящих трубопроводов.