#python #airflow
#python #поток воздуха
Вопрос:
airflow_version = 1.10.2; python_version = 3.6.8
У меня возникли проблемы с пониманием того, как сделать вызываемую функцию python более многоразовой для PythonOperator от airflow, поскольку та же функция, объявленная в самом файле dag, работает, но импортировать ее из вспомогательной библиотеки не удается.
Итак, работает следующее:
def my_function(temp_file, task_id, **kwargs):
xcom_vals = kwargs['ti'].xcom_pull(task_ids=task_id)
if not xcom_vals:
return 'Xcom message not retrieved'
ack_messages = []
for item in xcom_vals:
ack_messages = <do stuff>
return ack_messages
with DAG(<dag args>):
process_messages = PythonOperator(
task_id='get_messages',
python_callable=my_function,
op_kwargs={'task_id': 'previous_task_id',
'temp_file': temp_file},
provide_context=True,
)
Но, переместив my_function в модуль lib/helpers.py а затем ее импорт завершается ошибкой.
Broken DAG: [path to dag] cannot import my_function
ПРИМЕЧАНИЕ: lib/helpers.py содержит другие функции (хотя и более простые), которые успешно импортируются и используются в текущей и других базах данных.
Как должна быть реализована my_function, чтобы ее можно было вызывать другими базами данных?
Комментарии:
1. У меня была похожая проблема, которая решила ее для меня, это обновить путь к плагину в файле airflow.cfg plugins_folder = /airflow /plugins
2. На самом деле для plugins_folder уже установлено значение $airflow_home/plugins. Вы говорите, что my_function должна быть реализована с помощью собственного пользовательского оператора?
3. @dorvak библиотеки — это подпуть к базам данных, поэтому структура папок есть
airflow -- dags -- libs
, а библиотеки — это модуль py, так что init .py есть :4. в библиотеках @dorvak есть
__init__.py
, в моем предыдущем комментарии двойное подчеркивание было проанализировано как md. Дело в том, libs/helper.py содержит другие функции (хотя и более простые), которые успешно импортированы и используются в текущем и других базах данных!5. Хорошо, извините за путаницу (удалил мои комментарии)
Ответ №1:
Разобрался с этим, дело дошло до того, что пользовательский интерфейс airflow и планировщик неправильно анализировали папки libs, какое-то поведение с задержкой после синхронизации git? Таким образом, и пользовательский интерфейс, и планировщик правильно проанализировали файл dag, но не папку lib.
Что в итоге решило это поведение, так это перезапуск как пользовательского интерфейса, так и модуля планировщика (мы запускаем airflow в kubernetes), и все.