#python #airflow
Вопрос:
Итак, я менял DAG своего рабочего ETL, когда обнаружил что-то подобное.
# (...)
def build_spark_task(**kwargs) -> SparkTaskConfig:
spark_task = SparkTaskConfig(...)
# (...)
return spark_task
def get_spark_k8s_tasks_tuple(**kwargs) -> (SparkKubernetesOperator, SparkKubernetesSensor):
create_k8s_app = SparkKubernetesOperator(...) # the task
wait_app_completion = SparkKubernetesSensor(...) # the task sensor
# (...)
return create_k8s_app, wait_app_completion
# Bronze layer stuff
task_bronze = build_spark_task(...)
# Silver layer stuff
task_silver = build_spark_task(...)
# Some other ETL
task_etl_1 = build_spark_task(...)
# Some other ETL 2
task_etl_2 = build_spark_task(...)
with DAG(
dag_id=DAG_ID,
schedule_interval=SCHEDULE,
description=DESCRIPTION,
default_args=DEFAULT_ARGS,
catchup=False,
concurrency=10,
template_searchpath=TEMPLATE_DIR,
) as dag:
start_dag = DummyOperator(task_id="start_dag")
end_dag = DummyOperator(task_id="end_dag")
run_bronze, wait_bronze = get_spark_k8s_tasks_tuple(task_bronze)
run_silver, wait_silver = get_spark_k8s_tasks_tuple(task_silver)
run_etl_1, wait_etl_1 = get_spark_k8s_tasks_tuple(task_etl_1)
run_etl_2, wait_etl_2 = get_spark_k8s_tasks_tuple(task_etl_2)
# Main path
dag_build = start_dag >> run_bronze >> wait_bronze >> run_silver >> wait_silver
# ETL 1 path
dag_build >> run_etl_1, wait_etl_1 >> end_dag
# ETL 2 path
dag_build >> run_etl_2 >> wait_etl_2 >> end_dag
что приводит к появлению DAG, как показано ниже.
start_dag -> run_bronze -> wait_bronze -> run_silver -> (...)
-> run_etl_1 -> wait_etl_1 -
(...) -> wait_silver / -> end_dag
-> run_etl_2 -> wait_etl_2 -/
Я подумал, что за процессом «run_» должно следовать соответствующее «wait_», поэтому я мог бы заменить возврат этой функции чем-то вроде
def get_spark_k8s_tasks_tuple(**kwargs) -> (SparkKubernetesOperator, SparkKubernetesSensor):
create_k8s_app = SparkKubernetesOperator(...) # the task
wait_app_completion = SparkKubernetesSensor(...) # the task sensor
# (...)
return create_k8s_app >> wait_app_completion # <== to this
и мы бы тогда сделали более чистый код (ИМХО)
run_bronze = get_spark_k8s_tasks_tuple(task_bronze)
run_silver = get_spark_k8s_tasks_tuple(task_silver)
run_etl_1 = get_spark_k8s_tasks_tuple(task_etl_1)
run_etl_2 = get_spark_k8s_tasks_tuple(task_etl_2)
# Main path
dag_build = start_dag >> run_bronze >> run_silver >> end_dag
# ETL 1 path
dag_build >> run_etl_1 >> end_dag
# ETL 2 path
dag_build >> run_etl_2 >> end_dag
Но эти правки не привели к тому, что я искал DAG. Вместо этого у него были некоторые задачи «run_», начинающиеся из ниоткуда, и задачи «wait_» соединяли их с основным краем пути.
Как использовать переменные Python для отражения большего, чем узел DAG, и включения кода для сушки?
Ответ №1:
Я не думаю, что это самый чистый способ сделать это, но если вы хотите «исправить» свой DAG, просто передайте его в dag
качестве аргумента функции, назначьте его dag
параметру оператора, используйте определение соединения ( >>
) и ничего не возвращайте из функции.
Поскольку вы находитесь в контексте DAG ( with DAG(...) as dag
) Воздушный поток выполняет задание за кулисами, но когда вы выходите за пределы этой области (например, внутри функции Python get_spark_k8s_tasks_tuple
), вы оказываетесь вне этого контекста.
Ваша функция будет выглядеть следующим образом:
def get_spark_k8s_tasks_tuple(dag, **kwargs) -> None:
create_k8s_app = SparkKubernetesOperator(..., dag=dag) # the task
wait_app_completion = SparkKubernetesSensor(..., dag=dag) # the task sensor
# (...)
create_k8s_app >> wait_app_completion
# NO return