Как проверить различное время выполнения задачи в DAG с помощью внешнего датчика воздушного потока

#python #python-3.x #airflow #airflow-scheduler

Вопрос:

Представьте, что у меня есть DAG A, включающая некоторые задачи, и эти задачи зависят от какого-либо внешнего датчика для другой задачи в DAG B.

Например, я хочу проверить состояние задачи в DAG B в 10:00, и если этот запуск будет выполнен успешно, то задачи в DAG A могут быть запущены. Но теперь по какой-то причине задача в DAG B в 10:00 выполнена неудачно, но выполнение той же задачи в 11:00 выполнено успешно.

Проблема в том, что задачи в DAG A будут отложены навсегда, потому что задача в DAG B не удалась в 10:00. Но это нормально, если следующий запуск прошел успешно.

Как я могу реализовать такую вещь во внешнем датчике воздушного потока, который проверяет состояние следующего времени выполнения в другой DAG, и если это удастся, то мои задачи могут выполняться без проблем?

P. S: по некоторым причинам я не могу использовать повторную попытку!

Заранее спасибо.

Ответ №1:

Я сам нашел решение. Возможно, это не лучший подход, но он работает. В этом случае мы можем определить функцию on_failure_callback и установить тайм-аут для нашего внешнего датчика, и когда тайм-аут будет достигнут, мы проверим следующий запуск задачи в другой DAG, и если это удастся, мы установим статус нашего внешнего датчика, SUCCESS чтобы другие задачи, зависящие от этого датчика, могли выполняться без проблем. Это код для такого подхода:

 from airflow.utils.state import State from airflow.sensors.external_task_sensor import ExternalTaskSensor from airflow.exceptions import AirflowSensorTimeout from datetime import datetime, timedelta, timezone from airflow.api.common.experimental.get_task_instance import get_task_instance from dateutil.parser import parse from functools import partial  def _failure_callback(task_id, dag_id, execution_date, context):  if isinstance(context['exception'], AirflowSensorTimeout):  sensor_instance = context['task_instance']  next_execution_date = parse(context['ts'])   -(execution_date)   timedelta(hours=1)  ti = get_task_instance(dag_id=dag_id, task_id=task_id, execution_date=next_execution_date)  if ti.current_state() == 'success':  sensor_instance.set_state(State.SUCCESS)    sensor = ExternalTaskSensor(external_task_id='external_task_id',  task_id='sensor',  external_dag_id='external_dag_id',  execution_delta=timedelta(hours=-24)   timedelta(minutes=-30),  timeout=5,  on_failure_callback=partial(_failure_callback, 'external_task_id', 'external_dag_id', timedelta(hours=-24)   timedelta(minutes=-30)),  dag=dag)