#python #airflow #airflow-scheduler #directed-acyclic-graphs
Вопрос:
Я ищу способ установить, когда состоялся последний успешный запуск определенной задачи. Я знаю, что существует контекстная переменная prev_execution_date_success, но, согласно документам, в ней хранится информация обо всех задачах DAG, а не о составных задачах.
Мой DAG выглядит так: Программа DAG предназначена для сбоя по крайней мере одной задачи. В случае сбоя DAG запускается повторно. Я пытаюсь добиться того, чтобы экземпляр задачи не выполнял тяжелый код, если задача была успешно выполнена. Например, при повторном запуске DAG, если задача execute_query_heavy_code02 была успешно запущена (в тот же день), ее не следует запускать до следующего дня.
Ответ №1:
Быстрое решение (не лучшая практика, но достаточно хорошее) без изменения конструкции DAG состоит в том, чтобы иметь воздушный Variable
поток с датой последнего успешного выполнения execute_query_heavy_code02
. Сначала вы получаете значение переменной и сравниваете его с сегодняшним днем, и если дата старше, чем сегодня, выполните тяжелый код и установите значение переменной с помощью set()
функции.
Код будет выглядеть так:
# import section
from airflow.models import Variable
from datetime import datetime
# inside the execute_query_heavy_code02 function
last_execution = Variable.get('heavy_task_last_execution_date')
date_format = '%Y-%m-%d' # change as you want
last_execution_dt = datetime.strptime(last_execution, date_format).date()
today = datetime.utcnow().date()
if last_execution_dt == today:
# continue with next task
else:
# execute heavy code
Variable.set('heavy_task_last_execution_date', today.strftime(date_format))
Если вы хотите изменить какую-то часть дизайна, которую вы можете использовать BranchPythonOperator
(очень хорошо объяснено здесь).
Комментарии:
1. Я бы тоже согласился с этим. Но одна вещь, которую следует помнить, — это установить значение по умолчанию, если переменная не существует, чтобы не получать ошибок/предупреждений при первом запуске DAG.
last_execution = Variable.get('heavy_task_last_execution_date', default_var='whatever_here')
2. Определенно @KarolosK.! Вы также можете установить это во время создания переменной. Что-то вроде
1950-01-01
или только вчера.