#python #airflow
Вопрос:
Я хочу написать файл DAG с помощью BranchPjthonOpeator для выполнения задачи на основе условия.
Задача 1: должна выполняться только в воскресенье Задача 2: должна выполняться ежедневно
Задача будет содержать команду для отправки задания spark
Я ищу образец файла DAG
Ответ №1:
Это BranchPythonOpeator
здесь не нужно. Если вы хотите использовать ветвление, вы можете использовать BranchDayOfWeekOperator
как:
from airflow.operators.weekday import BranchDayOfWeekOperator
from airflow.operators.dummy import DummyOperator
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from airflow.utils.weekday import WeekDay
from airflow import DAG
from datetime import datetime
default_args = {
'owner': 'airflow',
'start_date': datetime(2021, 8, 19),
}
with DAG('stackoverflow_question',
default_args=default_args,
schedule_interval='@daily',
) as dag:
branch_op = BranchDayOfWeekOperator(
follow_task_ids_if_true="sunday_task",
follow_task_ids_if_false="end_task",
week_day=WeekDay.SUNDAY,
use_task_execution_day=False, #Change to True if you want to compare to DAG execution_date
task_id="branch_task"
)
sunday_op = SparkSubmitOperator(
application="${SPARK_HOME}/examples/src/main/python/task1.py",
task_id="sunday_task"
)
daily_op = SparkSubmitOperator(
application="${SPARK_HOME}/examples/src/main/python/task2.py",
task_id="daily_task"
)
end_op = DummyOperator(task_id="end_task")
daily_op >> branch_op >> [sunday_op, end_op]
Лично мне не нравится использовать операторов ветвления, когда нет реального случая ветвления. Обратите внимание, что я добавил здесь DummyOperator
. Если нет по крайней мере 2 реальных ответвлений, которым можно следовать, я предпочитаю использовать кондиционирование с ShortCircuitOperator
помощью:
from airflow.operators.python import ShortCircuitOperator
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from airflow import DAG
from datetime import datetime, date
def is_sunday():
# you can also compare to DAG execution_date if needed
if date.today().isoweekday() == 6: # 6 is Sunday
return True
return False
default_args = {
'owner': 'airflow',
'start_date': datetime(2021, 8, 19),
}
with DAG('stackoverflow_question',
default_args=default_args,
schedule_interval='@daily',
) as dag:
sunday_op = SparkSubmitOperator(
application="${SPARK_HOME}/examples/src/main/python/task1.py",
task_id="sunday_task"
)
daily_op = SparkSubmitOperator(
application="${SPARK_HOME}/examples/src/main/python/task2.py",
task_id="daily_task"
)
short_op = ShortCircuitOperator(
task_id='is_sundy',
python_callable=is_sunday,
)
daily_op >> short_op >> sunday_op
В этом решении ShortCircuitOperator
последующая задача будет выполнена только в воскресенье, в течение остальной части недели она будет пропущена.