#python #python-3.x #airflow
#python #python-3.x #воздушный поток
Вопрос:
Я обновляю среду, в которой размещен Airflow, с Airflow 1.10.5 и Python 2.7 до Airflow 2.0.1 и Python 3.9.
Я пытаюсь снова заставить макросы работать:
/home/ubuntu/airflow/plugins/ben.py
содержит:
from airflow.plugins_manager import AirflowPlugin
def ret_one():
return 1
class AirflowPluginsTest(AirflowPlugin):
name = "ret_one"
macros = [ret_one]
Я взял пример DAG из GitHub https://github.com/apache/airflow/blob/master/airflow/example_dags/example_python_operator.py и добавил from airflow.macros.ben import ret_one
, но я получаю сообщение об ошибке:
Broken DAG: [/home/ubuntu/airflow/dags/example_python_operator.py] Traceback (most recent call last):
File "<frozen importlib._bootstrap>", line 228, in _call_with_frames_removed
File "/home/ubuntu/airflow/dags/example_python_operator.py", line 6, in <module>
from airflow.macros.ben import ret_one
ModuleNotFoundError: No module named 'airflow.macros.ben'
Что я здесь делаю не так? Я пробовал различные комбинации импорта (удаление ben
и т. Д.).
Мой airflow.cfg
содержит:
[core]
...
plugins_folder = /home/ubuntu/airflow/plugins
и я перезапускаю все службы Airflow (через supervisord
) между изменениями. Я вижу, что ret_one
плагин появляется в пользовательском интерфейсе Airflow (Admin -> Плагины).
Ответ №1:
Когда вы вызываете макрос в шаблоне, вы должны вызвать его следующим образом:
{{ macros.PLUGIN_NAME.FUNCTION }}
В вашем случае PLUGIN_NAME — «ret_one». Итак, ваш вызов макроса в шаблоне будет выглядеть следующим образом:
{{ macros.ret_one.ret_one() }}
https://airflow.apache.org/docs/apache-airflow/stable/plugins.html
Ответ №2:
Решение состоит в том, чтобы просто реализовать макрос независимо от любых библиотек Airflow, а затем импортировать его, как и любой класс / функцию в пути Python.