Повторное использование переменной кромки/поддага в DAG воздушного потока

#python #airflow

Вопрос:

Итак, я менял DAG своего рабочего ETL, когда обнаружил что-то подобное.

 # (...)

def build_spark_task(**kwargs) -> SparkTaskConfig:
    spark_task = SparkTaskConfig(...)

    # (...)
    
    return spark_task

def get_spark_k8s_tasks_tuple(**kwargs) -> (SparkKubernetesOperator, SparkKubernetesSensor):
    create_k8s_app = SparkKubernetesOperator(...)  # the task
    wait_app_completion = SparkKubernetesSensor(...)  # the task sensor

    # (...)

    return create_k8s_app, wait_app_completion


# Bronze layer stuff
task_bronze = build_spark_task(...)

# Silver layer stuff
task_silver = build_spark_task(...)

# Some other ETL
task_etl_1 = build_spark_task(...)

# Some other ETL 2
task_etl_2 = build_spark_task(...)

with DAG(
    dag_id=DAG_ID,
    schedule_interval=SCHEDULE,
    description=DESCRIPTION,
    default_args=DEFAULT_ARGS,
    catchup=False,
    concurrency=10,
    template_searchpath=TEMPLATE_DIR,
) as dag:

    start_dag = DummyOperator(task_id="start_dag")
    end_dag = DummyOperator(task_id="end_dag")

    run_bronze, wait_bronze = get_spark_k8s_tasks_tuple(task_bronze)

    run_silver, wait_silver = get_spark_k8s_tasks_tuple(task_silver)

    run_etl_1, wait_etl_1 = get_spark_k8s_tasks_tuple(task_etl_1)

    run_etl_2, wait_etl_2 = get_spark_k8s_tasks_tuple(task_etl_2)

    # Main path
    dag_build = start_dag >> run_bronze >> wait_bronze >> run_silver >> wait_silver

    # ETL 1 path
    dag_build >> run_etl_1, wait_etl_1 >> end_dag

    # ETL 2 path
    dag_build >> run_etl_2 >> wait_etl_2 >> end_dag
 

что приводит к появлению DAG, как показано ниже.

 start_dag -> run_bronze -> wait_bronze -> run_silver -> (...)

                      -> run_etl_1 -> wait_etl_1 -
(...) -> wait_silver /                             -> end_dag
                     -> run_etl_2 -> wait_etl_2 -/
 

Я подумал, что за процессом «run_» должно следовать соответствующее «wait_», поэтому я мог бы заменить возврат этой функции чем-то вроде

 
def get_spark_k8s_tasks_tuple(**kwargs) -> (SparkKubernetesOperator, SparkKubernetesSensor):
    create_k8s_app = SparkKubernetesOperator(...)  # the task
    wait_app_completion = SparkKubernetesSensor(...)  # the task sensor

    # (...)

    return create_k8s_app >> wait_app_completion  # <== to this
 

и мы бы тогда сделали более чистый код (ИМХО)

 
    run_bronze = get_spark_k8s_tasks_tuple(task_bronze)

    run_silver = get_spark_k8s_tasks_tuple(task_silver)

    run_etl_1 = get_spark_k8s_tasks_tuple(task_etl_1)

    run_etl_2 = get_spark_k8s_tasks_tuple(task_etl_2)

    # Main path
    dag_build = start_dag >> run_bronze >> run_silver >> end_dag

    # ETL 1 path
    dag_build >> run_etl_1 >> end_dag

    # ETL 2 path
    dag_build >> run_etl_2 >> end_dag

 

Но эти правки не привели к тому, что я искал DAG. Вместо этого у него были некоторые задачи «run_», начинающиеся из ниоткуда, и задачи «wait_» соединяли их с основным краем пути.

Как использовать переменные Python для отражения большего, чем узел DAG, и включения кода для сушки?

Ответ №1:

Я не думаю, что это самый чистый способ сделать это, но если вы хотите «исправить» свой DAG, просто передайте его в dag качестве аргумента функции, назначьте его dag параметру оператора, используйте определение соединения ( >> ) и ничего не возвращайте из функции.

Поскольку вы находитесь в контексте DAG ( with DAG(...) as dag ) Воздушный поток выполняет задание за кулисами, но когда вы выходите за пределы этой области (например, внутри функции Python get_spark_k8s_tasks_tuple ), вы оказываетесь вне этого контекста.

Ваша функция будет выглядеть следующим образом:

 def get_spark_k8s_tasks_tuple(dag, **kwargs) -> None:
    create_k8s_app = SparkKubernetesOperator(..., dag=dag)  # the task
    wait_app_completion = SparkKubernetesSensor(..., dag=dag)  # the task sensor

    # (...)

    create_k8s_app >> wait_app_completion
    # NO return