Воздушный поток: параллельное выполнение задач по цепочке

#python #parallel-processing #airflow

Вопрос:

В настоящее время я работаю над DAG, которая требует ежемесячного циклического выполнения длинного списка задач.

Чтобы достичь этого, я создаю пустой список, а затем перебираю несколько задач, меняя их идентификаторы задач в соответствии с новым месяцем.

Пример:

 from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.helpers import chain

dag = DAG(
    "import_trx_table",
    default_args=default_args,
    schedule_interval="45 9 * * *",
)

task_list = []

for month_ds in month_lst_ds:
    start = DummyOperator(task_id=f"dummy_start_trx_table_imports_{month_ds}", dag=dag)

    task_list.append(start)

...

chain(*task_list)
 

Проблема в том, что DAG сейчас очень длинная (и медленная) (так как я повторяю более 12 месяцев). Некоторые из этих задач определенно могут выполняться параллельно.

Я попытался проверить, есть ли у вспомогательной функции цепочки способ параллельного соединения, но ничего не смог найти.

Есть какие-нибудь предложения/идеи?

Спасибо.

Комментарии:

1. Не могли бы вы, пожалуйста, добавить эскиз, как, по вашему мнению, будет выглядеть DAG в пользовательском интерфейсе? Существует ли какая-либо зависимость между месяцами или все месяцы могут выполняться параллельно?

2. Между месяцами нет никакой зависимости. На самом деле все они могут выполняться одновременно с записью задач в ежемесячные разделы.

Ответ №1:

Вы можете использовать chain , но это на самом деле не имеет значения здесь.

 from airflow import DAG
from airflow.operators.dummy import DummyOperator
from datetime import datetime

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2021, 6, 7),
}

dag = DAG(
    "stackoverflow_question",
    default_args=default_args,
    schedule_interval="@daily",
)

month_lst_ds = ['Dec', 'Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov']

start_op = DummyOperator(task_id='start_task', dag=dag)
end_op = DummyOperator(task_id='end_task', dag=dag)
for month_ds in month_lst_ds:
    month_op = DummyOperator(task_id=f"dummy_start_trx_table_imports_{month_ds}", dag=dag)
    start_op >> month_op >> end_op 
 

Если вы хотите, вы можете заменить

 start_op >> month_op >> end_op 
 

с

 chain(start_op, month_op, end_op )
 

Вот как выглядит структура DAG в представлении графика:

введите описание изображения здесь

Комментарии:

1. Это выглядит очень хорошо. Я попробовал это с помощью DAG, но как только start_op завершает весь запуск, он также помечается как успешный и не выполняет фактические ежемесячные задачи. Есть идеи, почему?

2. Это еще один вопрос, который не связан со структурой DAG… для этого, пожалуйста, опубликуйте свой реальный код в новом вопросе. Ваш код, вероятно, не является DummyOperators

3. @ArturoBelano У меня иногда возникали проблемы с пропусками в подобных массивно-параллельных структурах, но я предполагал, что это связано либо с версией воздушного потока, либо с ограничениями на параллельную обработку воздушного потока. Элад прав в том, что отладка структуры должна быть отдельным вопросом, если только отображаемая им структура существенно не отличается от того, что вы намеревались.

4. @SarahMesser это верное замечание. Ответ Элада верен. Спасибо за помощь!