Воздушный поток — создание динамических задач из XCOM

#python #airflow

#python #воздушный поток

Вопрос:

Я пытаюсь сгенерировать набор динамических задач из переменной XCOM. В XCOM я сохраняю список, и я хочу использовать каждый элемент списка для динамического создания последующей задачи.

Мой вариант использования заключается в том, что у меня есть вышестоящий оператор, который проверяет sftp-сервер на наличие файлов и возвращает список имен файлов, соответствующих определенным критериям. Я хочу создавать динамические задачи для каждого из возвращаемых имен файлов.

Я упростил его до приведенного ниже, и, хотя он работает, я чувствую, что это не идиоматическое решение для воздушного потока. В моем случае я бы написал функцию python, которая вызывается из оператора python, который извлекает значение из xcom и возвращает его, вместо использования функции pusher .

Я понимаю, что, хотя я могу создать пользовательский оператор, который сочетает в себе оба, я не думаю, что создание одноразового оператора является хорошей практикой, и я надеюсь, что есть другое решение.

 from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from airflow import DAG
from datetime import datetime, timedelta

default_args = {
    "owner": "test",
    "depends_on_past": False,
    "start_date": datetime(2018, 10, 27),
    "email": ["test@mctest.com"],
    "email_on_failure": False,
    "email_on_retry": False,
    "email_on_success": False,
    "retries": 0,
    "provide_context": True
}

dag = DAG("test",  default_args=default_args, schedule_interval="@daily", catchup=False)


def pusher(**context):
    return ['a', 'b', 'c', 'd', 'e']

pusher_task = PythonOperator(
    task_id='pusher_task',
    dag=dag,
    python_callable=pusher  
)

def bash_wrapper(task, **context):
    return BashOperator(
        task_id='dynamic' task,
        dag=dag,
        bash_command='date'
    )

end = BashOperator(task_id='end', dag=dag, bash_command='echo task has ended')


pusher_task >> [bash_wrapper(task) for task in pusher()] >> end
  

Ответ №1:

Я бы не стал делать то, чего вы пытаетесь достичь, главным образом потому, что:

  1. Значение XCOM — это состояние, сгенерированное во время выполнения
  2. Структура базы данных определяется во время синтаксического анализа

Даже если вы используете что-то вроде следующего, чтобы получить доступ к значениям XCOM, сгенерированным какой-либо вышестоящей задачей:

 from airflow.models import TaskInstance
from airflow.utils.db import provide_session

dag = DAG(...)

@provide_session
def get_files_list(session):
    execution_date = dag.previous_schedule(datetime.now())

    // Find previous task instance:
    ti = session.query(TaskInstance).filter(
        TaskInstance.dag_id == dag.dag_id,
        TaskInstance.execution_date == execution_date,
        TaskInstance.task_id == upstream_task_id).first()
    if ti:
        files_list = ti.xcom_pull()
        if files_list:
            return files_list
    // Return default state:
    return {...}


files_list = get_files_list()
// Generate tasks based on upstream task state:
task = PythonOperator(
    ...
    xcom_push=True,
    dag=dag)
  

Но это будет вести себя очень странно, потому что синтаксический анализ DAG и выполнение задачи не синхронизированы так, как вы хотите.

Если основной причиной, по которой вы хотите это сделать, является распараллеливание обработки файлов, у меня было бы некоторое статическое количество задач обработки (определяемое требуемым параллелизмом), которые считывают список файлов из значения XCOM вышестоящей задачи и работают с соответствующей частью этого списка.

Другой вариант — распараллеливание обработки файлов с использованием какой-либо платформы для распределенных вычислений, такой как Apache Spark.

Комментарии:

1. Основными причинами для этого являются 1. Параллелизм, 2. Я не могу контролировать, сколько файлов / или данных в списке я получаю от вышестоящей задачи, но я хочу убедиться, что все файлы проходят через нижестоящие задачи

2. При первом подходе, о котором вы упомянули, предположим, что имеется 5 файлов и две задачи обработки. После того, как задачи обработки обрабатывают file1 и file2 из списка, как я могу снова запустить ту же задачу для file3 и file4? И, в конце концов, как я могу запустить только 1 задачу обработки для file5? Любой пример в этом направлении полезен

3. @nightgaunt каждая задача обработки занимает часть files массива, используя: files[int(task_idx * len(files) / parallelism):int((task_idx 1) * len(files) / parallelism)] , таким образом, первая задача получает файлы 1-2, а вторая — файлы с 3 по 5.

4. Но одна задача должна обрабатывать несколько файлов путем итерации в рамках одного оператора. Правильно? Для этого требуется пользовательский оператор, который выполняет одну и ту же операцию дважды / трижды. Не воспринимайте это плохо. Ваше решение имеет смысл и ближе всего к ответу. Но если есть способ заставить задачу работать только с одним файлом, я готов пойти с этим вариантом.

5. Решение для использования вложенных тегов и динамической генерации задач назревает в моей голове уже некоторое время. Но ваше объяснение о том, что xcom — это время выполнения, а структура — время анализа, заставляет меня колебаться.

Ответ №2:

Самый простой способ, который я могу придумать, это использовать оператор перехода. https://github.com/apache/airflow/blob/master/airflow/example_dags/example_branch_operator.py

Комментарии:

1. Можете ли вы уточнить? Как BranchOperator помогает получить доступ к переменным xcom и установить для них динамические задачи по потоку?

2. В вашем примере показано, как программно создавать ветви — вопрос заключался в том, как динамически создавать задачи в зависимости от результата, полученного предыдущей задачей во время выполнения