Кэш сериализации базы данных Airflow

#python #airflow

#python #воздушный поток

Вопрос:

Я развернул Airflow 2.2.2 в режиме CeleryExecutor. Я создал DAG, который импортирует классы из пользовательского пакета. Когда я добавляю новый атрибут к этому импортированному классу, запускаю новую версию своего пакета, повторно развертываю Airflow с правильным пакетом новой версии, на который ссылается (в conda env), и добавляю ссылку на этот новый атрибут в моей базе данных, пользовательский интерфейс Airflow показывает ошибку импорта, указывая, что недавно созданныйдобавленный атрибут не существует в моем классе.

Что я уже протестировал:

  • Жесткое кодирование значения атрибута в DAG привело к правильному анализу базы данных
  • Я проверил, что правильная версия моего пользовательского пакета была установлена на контейнерах веб-сервера / планировщика
  • Запуск python3.7 на моих контейнерах веб-сервера / планировщика (с активированной conda env) и импорт класса из моего пользовательского пакета привели к no AttributeError .
  • Повторное развертывание Airflow после развертывания моего пользовательского пакета ничего не исправило
  • Удаление всех записей в dag_code serialized_dag таблицах metedata и metedata не устранило проблему

Выводы на данный момент

Мне кажется, что это связано с тем, как Airflow 2.x обрабатывает сериализацию базы данных (https://airflow.apache.org/docs/apache-airflow/stable/dag-serialization.html ), и что каким-то образом предыдущая версия импортированного класса все еще где-то кэшируется. Любая помощь по этому вопросу будет высоко оценена