Как протестировать операторы обратного потока on_failure_callback

#pytest #airflow

#pytest #воздушный поток

Вопрос:

У меня есть сценарий, в котором я хочу выполнить интеграционный тест для операторов, вызываемых через on_failure_callback , в базе данных Airflow.

Минимальный пример этого DAG выглядит следующим образом:

 def failure_callback(context):
    # CustomOperator in this case links to an external K8s service
    handle_failure = CustomOperator(
        task_id="handle_failure",
        timestamp=context["ts"]
    )

    handle_failure.execute(context=context)

args = {
    "catchup": False,
    "retries": 3,
    "retry_delay": timedelta(seconds=30),
    "start_date": START_DATE,
    "on_failure_callback": failure_callback,
}

with DAG("foo", schedule_interval=None, default_args=args) as dag:
    
    task_to_fail = SomeOperator()
  

Моей первой мыслью для тестирования было бы запустить task_to_fail , дать ему завершиться неудачей и подтвердить результат failure_callback с помощью какого-либо другого процесса, попытка ниже:

 import pytest
from airflow.models import DagBag, TaskInstance
from dateutil import parser

@pytest.fixture
def foo_dag():
    dag_id = "foo"
    dag_bag = DagBag("dags")
    return dag_bag.dags[dag_id]

@pytest.mark.integration
def test_task_to_fail(foo_dag):
    execution_date = parser.parse("2000-01-01T00:00 00:00")
    task_id = "task_to_fail"

    task = foo_dag.get_task(task_id=task_id)
    task_instance = TaskInstance(task, execution_date)

    with pytest.raises(Exception):
        task_instance.run(ignore_task_deps=True, ignore_ti_state=True, test_mode=True)

    assert "INSERT DESIRED OUTCOME OF `failure_callback` HERE"
  

Проблема, с которой я сталкиваюсь, заключается в том, что не отображается, что failure_callback вызывается при запуске pytest . Я подозреваю, что это связано с тем, как вызывается TaskInstance (т. е. не выполняется on_failure_callback , но я не уверен.

Мои вопросы:

  1. Является ли это правильным способом проверки поведения этого обратного вызова? Если нет, то как это следует обрабатывать?
  2. Перед task_to_fail задачей есть много дорогостоящих операций, которые я хочу избежать выполнения во время тестов. Возможно ли выполнить полный запуск DAG, выполняемый с помощью pytest , начиная с конкретной задачи (в данном случае, task_to_fail ?

Комментарии:

1. Что вы пытаетесь протестировать, с функциональной точки зрения?

2. Да, если быть более наглядным: перед task_to_fail , находятся задачи, которые копируют данные на том K8s. task_to_fail сбой, оставляющий данные в томе. Я хочу on_failure_callback очистить этот том K8s. Я хочу, чтобы тест проверял функциональность CustomOperator очистки тома при сбое задачи в DAG. В настоящее время тест копирует некоторые фиктивные данные, я хочу, чтобы DAG запустился, произошел сбой, а затем утверждение подтвердит, что в томе K8s нет данных.