#pytest #airflow
#pytest #воздушный поток
Вопрос:
У меня есть сценарий, в котором я хочу выполнить интеграционный тест для операторов, вызываемых через on_failure_callback
, в базе данных Airflow.
Минимальный пример этого DAG выглядит следующим образом:
def failure_callback(context):
# CustomOperator in this case links to an external K8s service
handle_failure = CustomOperator(
task_id="handle_failure",
timestamp=context["ts"]
)
handle_failure.execute(context=context)
args = {
"catchup": False,
"retries": 3,
"retry_delay": timedelta(seconds=30),
"start_date": START_DATE,
"on_failure_callback": failure_callback,
}
with DAG("foo", schedule_interval=None, default_args=args) as dag:
task_to_fail = SomeOperator()
Моей первой мыслью для тестирования было бы запустить task_to_fail
, дать ему завершиться неудачей и подтвердить результат failure_callback
с помощью какого-либо другого процесса, попытка ниже:
import pytest
from airflow.models import DagBag, TaskInstance
from dateutil import parser
@pytest.fixture
def foo_dag():
dag_id = "foo"
dag_bag = DagBag("dags")
return dag_bag.dags[dag_id]
@pytest.mark.integration
def test_task_to_fail(foo_dag):
execution_date = parser.parse("2000-01-01T00:00 00:00")
task_id = "task_to_fail"
task = foo_dag.get_task(task_id=task_id)
task_instance = TaskInstance(task, execution_date)
with pytest.raises(Exception):
task_instance.run(ignore_task_deps=True, ignore_ti_state=True, test_mode=True)
assert "INSERT DESIRED OUTCOME OF `failure_callback` HERE"
Проблема, с которой я сталкиваюсь, заключается в том, что не отображается, что failure_callback
вызывается при запуске pytest
. Я подозреваю, что это связано с тем, как вызывается TaskInstance (т. е. не выполняется on_failure_callback
, но я не уверен.
Мои вопросы:
- Является ли это правильным способом проверки поведения этого обратного вызова? Если нет, то как это следует обрабатывать?
- Перед
task_to_fail
задачей есть много дорогостоящих операций, которые я хочу избежать выполнения во время тестов. Возможно ли выполнить полный запуск DAG, выполняемый с помощьюpytest
, начиная с конкретной задачи (в данном случае,task_to_fail
?
Комментарии:
1. Что вы пытаетесь протестировать, с функциональной точки зрения?
2. Да, если быть более наглядным: перед
task_to_fail
, находятся задачи, которые копируют данные на том K8s.task_to_fail
сбой, оставляющий данные в томе. Я хочуon_failure_callback
очистить этот том K8s. Я хочу, чтобы тест проверял функциональностьCustomOperator
очистки тома при сбое задачи в DAG. В настоящее время тест копирует некоторые фиктивные данные, я хочу, чтобы DAG запустился, произошел сбой, а затем утверждение подтвердит, что в томе K8s нет данных.