Как отправить задание spark с помощью pythonOpearator и BranchPythonOperator в Airflow

#python #airflow

Вопрос:

Я хочу написать файл DAG с помощью BranchPjthonOpeator для выполнения задачи на основе условия.

Задача 1: должна выполняться только в воскресенье Задача 2: должна выполняться ежедневно

Задача будет содержать команду для отправки задания spark

Я ищу образец файла DAG

Ответ №1:

Это BranchPythonOpeator здесь не нужно. Если вы хотите использовать ветвление, вы можете использовать BranchDayOfWeekOperator как:

 from airflow.operators.weekday import BranchDayOfWeekOperator
from airflow.operators.dummy import DummyOperator
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from airflow.utils.weekday import WeekDay


from airflow import DAG
from datetime import datetime

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2021, 8, 19),
}

with DAG('stackoverflow_question',
         default_args=default_args,
         schedule_interval='@daily',
         ) as dag:

    branch_op = BranchDayOfWeekOperator(
        follow_task_ids_if_true="sunday_task",
        follow_task_ids_if_false="end_task",
        week_day=WeekDay.SUNDAY,
        use_task_execution_day=False,  #Change to True if you want to compare to DAG execution_date
        task_id="branch_task"
    )

    sunday_op = SparkSubmitOperator(
        application="${SPARK_HOME}/examples/src/main/python/task1.py",
        task_id="sunday_task"
    )
    daily_op = SparkSubmitOperator(
        application="${SPARK_HOME}/examples/src/main/python/task2.py",
        task_id="daily_task"
    )

    end_op = DummyOperator(task_id="end_task")

    daily_op >> branch_op >> [sunday_op, end_op]
 

введите описание изображения здесь

Лично мне не нравится использовать операторов ветвления, когда нет реального случая ветвления. Обратите внимание, что я добавил здесь DummyOperator . Если нет по крайней мере 2 реальных ответвлений, которым можно следовать, я предпочитаю использовать кондиционирование с ShortCircuitOperator помощью:

 from airflow.operators.python import ShortCircuitOperator
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator

from airflow import DAG
from datetime import datetime, date


def is_sunday():
    # you can also compare to DAG execution_date if needed
    if date.today().isoweekday() == 6:  # 6 is Sunday
        return True
    return False

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2021, 8, 19),
}

with DAG('stackoverflow_question',
         default_args=default_args,
         schedule_interval='@daily',
         ) as dag:

    sunday_op = SparkSubmitOperator(
        application="${SPARK_HOME}/examples/src/main/python/task1.py",
        task_id="sunday_task"
    )
    daily_op = SparkSubmitOperator(
        application="${SPARK_HOME}/examples/src/main/python/task2.py",
        task_id="daily_task"
    )

    short_op = ShortCircuitOperator(
        task_id='is_sundy',
        python_callable=is_sunday,
    )

    daily_op >> short_op >> sunday_op
 

введите описание изображения здесь

В этом решении ShortCircuitOperator последующая задача будет выполнена только в воскресенье, в течение остальной части недели она будет пропущена.