#python #parallel-processing #airflow
Вопрос:
В настоящее время я работаю над DAG, которая требует ежемесячного циклического выполнения длинного списка задач.
Чтобы достичь этого, я создаю пустой список, а затем перебираю несколько задач, меняя их идентификаторы задач в соответствии с новым месяцем.
Пример:
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.helpers import chain
dag = DAG(
"import_trx_table",
default_args=default_args,
schedule_interval="45 9 * * *",
)
task_list = []
for month_ds in month_lst_ds:
start = DummyOperator(task_id=f"dummy_start_trx_table_imports_{month_ds}", dag=dag)
task_list.append(start)
...
chain(*task_list)
Проблема в том, что DAG сейчас очень длинная (и медленная) (так как я повторяю более 12 месяцев). Некоторые из этих задач определенно могут выполняться параллельно.
Я попытался проверить, есть ли у вспомогательной функции цепочки способ параллельного соединения, но ничего не смог найти.
Есть какие-нибудь предложения/идеи?
Спасибо.
Комментарии:
1. Не могли бы вы, пожалуйста, добавить эскиз, как, по вашему мнению, будет выглядеть DAG в пользовательском интерфейсе? Существует ли какая-либо зависимость между месяцами или все месяцы могут выполняться параллельно?
2. Между месяцами нет никакой зависимости. На самом деле все они могут выполняться одновременно с записью задач в ежемесячные разделы.
Ответ №1:
Вы можете использовать chain
, но это на самом деле не имеет значения здесь.
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from datetime import datetime
default_args = {
'owner': 'airflow',
'start_date': datetime(2021, 6, 7),
}
dag = DAG(
"stackoverflow_question",
default_args=default_args,
schedule_interval="@daily",
)
month_lst_ds = ['Dec', 'Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov']
start_op = DummyOperator(task_id='start_task', dag=dag)
end_op = DummyOperator(task_id='end_task', dag=dag)
for month_ds in month_lst_ds:
month_op = DummyOperator(task_id=f"dummy_start_trx_table_imports_{month_ds}", dag=dag)
start_op >> month_op >> end_op
Если вы хотите, вы можете заменить
start_op >> month_op >> end_op
с
chain(start_op, month_op, end_op )
Вот как выглядит структура DAG в представлении графика:
Комментарии:
1. Это выглядит очень хорошо. Я попробовал это с помощью DAG, но как только start_op завершает весь запуск, он также помечается как успешный и не выполняет фактические ежемесячные задачи. Есть идеи, почему?
2. Это еще один вопрос, который не связан со структурой DAG… для этого, пожалуйста, опубликуйте свой реальный код в новом вопросе. Ваш код, вероятно, не является DummyOperators
3. @ArturoBelano У меня иногда возникали проблемы с пропусками в подобных массивно-параллельных структурах, но я предполагал, что это связано либо с версией воздушного потока, либо с ограничениями на параллельную обработку воздушного потока. Элад прав в том, что отладка структуры должна быть отдельным вопросом, если только отображаемая им структура существенно не отличается от того, что вы намеревались.
4. @SarahMesser это верное замечание. Ответ Элада верен. Спасибо за помощь!