Возвращает список задач из функции, которые должны выполняться последовательно в Airflow

#airflow #google-cloud-composer

#воздушный поток #google-cloud-composer

Вопрос:

Я хочу вернуть 2 или более задач из функции, которые должны выполняться последовательно в том месте, где они вставлены в зависимости, см. Ниже.

 t1 = PythonOperator()

def generate_tasks():
    t2 = PythonOperator()
    t3 = PythonOperator()
    return magic(t2, t3) # magic needed here (preferably)

t1 >> generate_tasks() # otherwise here
# desired result: t1 >> t2 >> t3
 

Это выполнимо? Насколько я понимаю, Airflow 2.0, похоже, достигает этого с помощью TaskGroup, но мы используем Google Composer, и 2.0 некоторое время не будет доступен.

Лучшее решение, которое я нашел:

 t1 = PythonOperator()

def generate_tasks():
    t2 = PythonOperator()
    t3 = PythonOperator()
    return [t2, t3]

tasks = generate_tasks()
t1 >> tasks[0] >> tasks[1]
 

Но мне бы очень хотелось, чтобы это было абстрагировано, поскольку это более или менее противоречит цели возврата нескольких операторов из одной функции. Мы хотим, чтобы это было единое целое, насколько известно конечному пользователю, даже если оно может состоять из 2 или более задач.

Как это сделать с помощью TaskGroup в Airflow 2.0:

 class Encryptor:
    def encrypt_and_archive(self):
        with TaskGroup("archive_and_encrypt") as section_1:
            encrypt = DummyOperator(task_id="encrypt")
            archive = BashOperator(task_id="archive", bash_command='echo 1')
            encrypt >> archive
        return section_1

with DAG(dag_id="example_return_task_group", start_date=days_ago(2), tags=["example"]) as dag:
    start = DummyOperator(task_id="start")
    encrypt_and_archive = Encryptor().encrypt_and_archive()
    end = DummyOperator(task_id='end')

             # 👇 single variable, containing two tasks
    start >> encrypt_and_archive >> end
 

Который создает следующий график:

Группа задач на графике

Возможно ли что-то подобное удаленно до версии 2.0?

Комментарии:

1. Как насчет перебора задач?

Ответ №1:

Вы не объяснили, что magic(t2, t3) это такое. TaskGroup — это строго функция пользовательского интерфейса, она не влияет на логику DAG. Согласно вашему описанию, кажется, что вы ищете определенную логику (иначе что это magic ?).

Я считаю, что это то, что вам нужно:

 default_args = {
    'owner': 'airflow',
    'start_date': datetime(2021, 1, 24),
}
def generate_tasks():
    operator_list =[]
    for i in range(5): # Replace to generate the logic you wish to dynamically create tasks
        op = DummyOperator(task_id=f"t{str(i)}_task", dag=dag)
        if i>0:
            operator_list[i - 1] >> op
        operator_list.append(op)
    return operator_list

with DAG(
    dag_id='loop',
    default_args=default_args,
    schedule_interval=None,
) as dag:
    start_op = DummyOperator(task_id='start_task')
    end_op = DummyOperator(task_id='end_task')
    tasks = generate_tasks()
    start_op >> tasks[0]
    tasks[-1] >> end_op
 

введите описание изображения здесь

Вы можете заменить DummyOperator любым оператором, который вы хотите.

Комментарии:

1. Похоже, это приводит к тому же, что и мой обходной путь, поскольку функция возвращает только 2 задачи. Магия — это просто какая-то еще неопределенная магия. Мы не можем вернуть их в виде кортежа или списка, потому что тогда они будут выполняться параллельно, поэтому я спрашиваю, не пропустил ли я какие-либо другие варианты. Я еще не пробовал TaskGroup out, но, насколько я могу судить, мы должны иметь возможность использовать это для группировки зависимостей: airflow.apache.org/docs/apache-airflow/stable /…

2. группа задач @Frans — это просто функция пользовательского интерфейса. Пожалуйста, добавьте эскиз того, какую зависимость вы хотите между задачами. очень сложно понять, что вам нужно, без надлежащего примера желаемого результата.

3. Обновлен вопрос с помощью примера группы задач.

4. @Frans В вашей целевой группе есть только 2 жестко закодированных оператора. Это создает график с одной ветвью, идентичной тому, что я показал. Я предполагаю, что у вас более сложная логика (возможно, с несколькими ветвями посередине?) Пожалуйста, покажите ваши фактические ожидания. Если вы хотите показать это с помощью группы задач, тогда покажите нам изображение DAG, когда все группы задач широко открыты (как будто их там даже нет). Группа задач — это в основном настройка пользовательского интерфейса. Логика op>> task_group просто связывает op со всеми заголовками в группе задач. Мои ответы похожи на группу задач с 1 заголовком и 1 листом.

5. @Frans Если то, что вам действительно нужно, это взять start_op >> tasks[0] tasks[-1] >> end_op и каким-то образом создать start_op >> X >> end_op — это невозможно в Airflow <2.0, для этого вам придется обновиться. Как я объяснил, функциональность может быть получена, но не с тем же синтаксисом.