#python #airflow
#python #воздушный поток
Вопрос:
Я развернул Airflow 2.2.2 в режиме CeleryExecutor. Я создал DAG, который импортирует классы из пользовательского пакета. Когда я добавляю новый атрибут к этому импортированному классу, запускаю новую версию своего пакета, повторно развертываю Airflow с правильным пакетом новой версии, на который ссылается (в conda env), и добавляю ссылку на этот новый атрибут в моей базе данных, пользовательский интерфейс Airflow показывает ошибку импорта, указывая, что недавно созданныйдобавленный атрибут не существует в моем классе.
Что я уже протестировал:
- Жесткое кодирование значения атрибута в DAG привело к правильному анализу базы данных
- Я проверил, что правильная версия моего пользовательского пакета была установлена на контейнерах веб-сервера / планировщика
- Запуск python3.7 на моих контейнерах веб-сервера / планировщика (с активированной conda env) и импорт класса из моего пользовательского пакета привели к no
AttributeError
. - Повторное развертывание Airflow после развертывания моего пользовательского пакета ничего не исправило
- Удаление всех записей в
dag_code
serialized_dag
таблицах metedata и metedata не устранило проблему
Выводы на данный момент
Мне кажется, что это связано с тем, как Airflow 2.x обрабатывает сериализацию базы данных (https://airflow.apache.org/docs/apache-airflow/stable/dag-serialization.html ), и что каким-то образом предыдущая версия импортированного класса все еще где-то кэшируется. Любая помощь по этому вопросу будет высоко оценена