Не удается импортировать пользовательский макрос Python в Airflow 2

#python #python-3.x #airflow

#python #python-3.x #воздушный поток

Вопрос:

Я обновляю среду, в которой размещен Airflow, с Airflow 1.10.5 и Python 2.7 до Airflow 2.0.1 и Python 3.9.

Я пытаюсь снова заставить макросы работать:

/home/ubuntu/airflow/plugins/ben.py содержит:

 from airflow.plugins_manager import AirflowPlugin

def ret_one():
    return 1

class AirflowPluginsTest(AirflowPlugin):
    name = "ret_one"
    macros = [ret_one]
 

Я взял пример DAG из GitHub https://github.com/apache/airflow/blob/master/airflow/example_dags/example_python_operator.py и добавил from airflow.macros.ben import ret_one , но я получаю сообщение об ошибке:

 Broken DAG: [/home/ubuntu/airflow/dags/example_python_operator.py] Traceback (most recent call last):
  File "<frozen importlib._bootstrap>", line 228, in _call_with_frames_removed
  File "/home/ubuntu/airflow/dags/example_python_operator.py", line 6, in <module>
    from airflow.macros.ben import ret_one
ModuleNotFoundError: No module named 'airflow.macros.ben'
 

Что я здесь делаю не так? Я пробовал различные комбинации импорта (удаление ben и т. Д.).

Мой airflow.cfg содержит:

 [core]
...
plugins_folder = /home/ubuntu/airflow/plugins
 

и я перезапускаю все службы Airflow (через supervisord ) между изменениями. Я вижу, что ret_one плагин появляется в пользовательском интерфейсе Airflow (Admin -> Плагины).

Ответ №1:

Когда вы вызываете макрос в шаблоне, вы должны вызвать его следующим образом:

 {{ macros.PLUGIN_NAME.FUNCTION }}
 

В вашем случае PLUGIN_NAME — «ret_one». Итак, ваш вызов макроса в шаблоне будет выглядеть следующим образом:

 {{ macros.ret_one.ret_one() }}
 

https://airflow.apache.org/docs/apache-airflow/stable/plugins.html

Ответ №2:

Решение состоит в том, чтобы просто реализовать макрос независимо от любых библиотек Airflow, а затем импортировать его, как и любой класс / функцию в пути Python.