#airflow #directed-acyclic-graphs
Вопрос:
Я хочу проверить свои dag, чтобы убедиться, что у них есть определенные аргументы по умолчанию, а также убедиться, что во всех dag нет ошибок импорта.
Я использую DagBag для заполнения групп dag, а затем повторяю каждую группу dag и проверяю значения каждой группы dag, чтобы убедиться, что они такие, какими я хочу их видеть.
Поскольку DagBag может также извлекать примеры dag, которые поставляются с воздушным потоком, я передаю аргумент include_example = False
, однако, когда я это делаю, я понимаю, что ни один из моих dag не помещается в dagbags.
Я неправильно использую DagBag? или есть другой лучший способ извлекать и проверять dag при тестировании?
Мой код
def test_no_import_errors():
dag_bag = DagBag(include_examples=False)
assert len(dag_bag.import_errors) == 0, "No Import Failures"
Ответ №1:
Мне удалось воспроизвести проблему, при создании DagBag
объекта, если вы не предоставите значение dag_folder
параметру, в коллекцию не будет добавлена группа DAG.
Так что, как заявил Ярек, это работает:
def test_no_import_errors():
dag_bag = DagBag(dag_folder="dags/", include_examples=False)
assert len(dag_bag.import_errors) == 0, "No Import Failures"
Вот пример, который я привел, чтобы проверить это:
# python -m unittest test_dag_validation.py
import unittest
import logging
from airflow.models import DagBag
class TestDAGValidation(unittest.TestCase):
@classmethod
def setUpClass(cls):
log = logging.getLogger()
handler = logging.FileHandler("dag_validation.log", mode="w")
handler.setLevel(logging.INFO)
log.addHandler(handler)
cls.log = log
def test_no_import_errors(self):
dag_bag = DagBag(dag_folder="dags/", include_examples=False)
self.log.info(f"How Many DAGs?: {dag_bag.size()}")
self.log.info(f"Import errors: {len(dag_bag.import_errors)}")
assert len(dag_bag.import_errors) == 0, "No Import Failures"
Ответ №2:
При создании объектов DagBag вы можете передать список папок, в которых DagBag должен искать файлы dag. Я думаю, в этом-то и проблема