#python #python-3.x #airflow #airflow-scheduler
Вопрос:
Представьте, что у меня есть DAG A, включающая некоторые задачи, и эти задачи зависят от какого-либо внешнего датчика для другой задачи в DAG B.
Например, я хочу проверить состояние задачи в DAG B в 10:00, и если этот запуск будет выполнен успешно, то задачи в DAG A могут быть запущены. Но теперь по какой-то причине задача в DAG B в 10:00 выполнена неудачно, но выполнение той же задачи в 11:00 выполнено успешно.
Проблема в том, что задачи в DAG A будут отложены навсегда, потому что задача в DAG B не удалась в 10:00. Но это нормально, если следующий запуск прошел успешно.
Как я могу реализовать такую вещь во внешнем датчике воздушного потока, который проверяет состояние следующего времени выполнения в другой DAG, и если это удастся, то мои задачи могут выполняться без проблем?
P. S: по некоторым причинам я не могу использовать повторную попытку!
Заранее спасибо.
Ответ №1:
Я сам нашел решение. Возможно, это не лучший подход, но он работает. В этом случае мы можем определить функцию on_failure_callback
и установить тайм-аут для нашего внешнего датчика, и когда тайм-аут будет достигнут, мы проверим следующий запуск задачи в другой DAG, и если это удастся, мы установим статус нашего внешнего датчика, SUCCESS
чтобы другие задачи, зависящие от этого датчика, могли выполняться без проблем. Это код для такого подхода:
from airflow.utils.state import State from airflow.sensors.external_task_sensor import ExternalTaskSensor from airflow.exceptions import AirflowSensorTimeout from datetime import datetime, timedelta, timezone from airflow.api.common.experimental.get_task_instance import get_task_instance from dateutil.parser import parse from functools import partial def _failure_callback(task_id, dag_id, execution_date, context): if isinstance(context['exception'], AirflowSensorTimeout): sensor_instance = context['task_instance'] next_execution_date = parse(context['ts']) -(execution_date) timedelta(hours=1) ti = get_task_instance(dag_id=dag_id, task_id=task_id, execution_date=next_execution_date) if ti.current_state() == 'success': sensor_instance.set_state(State.SUCCESS) sensor = ExternalTaskSensor(external_task_id='external_task_id', task_id='sensor', external_dag_id='external_dag_id', execution_delta=timedelta(hours=-24) timedelta(minutes=-30), timeout=5, on_failure_callback=partial(_failure_callback, 'external_task_id', 'external_dag_id', timedelta(hours=-24) timedelta(minutes=-30)), dag=dag)