DagBag не заполняет dag, как ожидалось

#airflow #directed-acyclic-graphs

Вопрос:

Я хочу проверить свои dag, чтобы убедиться, что у них есть определенные аргументы по умолчанию, а также убедиться, что во всех dag нет ошибок импорта.

Я использую DagBag для заполнения групп dag, а затем повторяю каждую группу dag и проверяю значения каждой группы dag, чтобы убедиться, что они такие, какими я хочу их видеть.

Поскольку DagBag может также извлекать примеры dag, которые поставляются с воздушным потоком, я передаю аргумент include_example = False , однако, когда я это делаю, я понимаю, что ни один из моих dag не помещается в dagbags.

Я неправильно использую DagBag? или есть другой лучший способ извлекать и проверять dag при тестировании?

Мой код

 def test_no_import_errors():
    dag_bag = DagBag(include_examples=False)
    assert len(dag_bag.import_errors) == 0, "No Import Failures"
 

Ответ №1:

Мне удалось воспроизвести проблему, при создании DagBag объекта, если вы не предоставите значение dag_folder параметру, в коллекцию не будет добавлена группа DAG.

Так что, как заявил Ярек, это работает:

 def test_no_import_errors():
    dag_bag = DagBag(dag_folder="dags/", include_examples=False)
    assert len(dag_bag.import_errors) == 0, "No Import Failures"
 

Вот пример, который я привел, чтобы проверить это:

 # python -m unittest test_dag_validation.py 
import unittest
import logging
from airflow.models import DagBag


class TestDAGValidation(unittest.TestCase):
    @classmethod
    def setUpClass(cls):
        log = logging.getLogger()
        handler = logging.FileHandler("dag_validation.log", mode="w")

        handler.setLevel(logging.INFO)
        log.addHandler(handler)
        cls.log = log

    def test_no_import_errors(self):
        dag_bag = DagBag(dag_folder="dags/", include_examples=False)
        self.log.info(f"How Many DAGs?: {dag_bag.size()}")
        self.log.info(f"Import errors: {len(dag_bag.import_errors)}")
        assert len(dag_bag.import_errors) == 0, "No Import Failures"


 

Ответ №2:

При создании объектов DagBag вы можете передать список папок, в которых DagBag должен искать файлы dag. Я думаю, в этом-то и проблема