Вызываемая функция Airflow python многоразового использования

#python #airflow

#python #поток воздуха

Вопрос:

airflow_version = 1.10.2; python_version = 3.6.8

У меня возникли проблемы с пониманием того, как сделать вызываемую функцию python более многоразовой для PythonOperator от airflow, поскольку та же функция, объявленная в самом файле dag, работает, но импортировать ее из вспомогательной библиотеки не удается.

Итак, работает следующее:

 def my_function(temp_file, task_id, **kwargs):

    xcom_vals = kwargs['ti'].xcom_pull(task_ids=task_id)

    if not xcom_vals:
        return 'Xcom message not retrieved'

    ack_messages = []

    for item in xcom_vals:

        ack_messages  = <do stuff>

    return ack_messages

with DAG(<dag args>):

    process_messages = PythonOperator(
        task_id='get_messages',
        python_callable=my_function,
        op_kwargs={'task_id': 'previous_task_id',
                    'temp_file': temp_file},
        provide_context=True,
    )
  

Но, переместив my_function в модуль lib/helpers.py а затем ее импорт завершается ошибкой.

 Broken DAG: [path to dag] cannot import my_function
  

ПРИМЕЧАНИЕ: lib/helpers.py содержит другие функции (хотя и более простые), которые успешно импортируются и используются в текущей и других базах данных.

Как должна быть реализована my_function, чтобы ее можно было вызывать другими базами данных?

Комментарии:

1. У меня была похожая проблема, которая решила ее для меня, это обновить путь к плагину в файле airflow.cfg plugins_folder = /airflow /plugins

2. На самом деле для plugins_folder уже установлено значение $airflow_home/plugins. Вы говорите, что my_function должна быть реализована с помощью собственного пользовательского оператора?

3. @dorvak библиотеки — это подпуть к базам данных, поэтому структура папок есть airflow -- dags -- libs , а библиотеки — это модуль py, так что init .py есть :

4. в библиотеках @dorvak есть __init__.py , в моем предыдущем комментарии двойное подчеркивание было проанализировано как md. Дело в том, libs/helper.py содержит другие функции (хотя и более простые), которые успешно импортированы и используются в текущем и других базах данных!

5. Хорошо, извините за путаницу (удалил мои комментарии)

Ответ №1:

Разобрался с этим, дело дошло до того, что пользовательский интерфейс airflow и планировщик неправильно анализировали папки libs, какое-то поведение с задержкой после синхронизации git? Таким образом, и пользовательский интерфейс, и планировщик правильно проанализировали файл dag, но не папку lib.

Что в итоге решило это поведение, так это перезапуск как пользовательского интерфейса, так и модуля планировщика (мы запускаем airflow в kubernetes), и все.