Apache Airflow дата последнего успешного выполнения задачи

#python #airflow #airflow-scheduler #directed-acyclic-graphs

Вопрос:

Я ищу способ установить, когда состоялся последний успешный запуск определенной задачи. Я знаю, что существует контекстная переменная prev_execution_date_success, но, согласно документам, в ней хранится информация обо всех задачах DAG, а не о составных задачах.

Мой DAG выглядит так: введите описание изображения здесь Программа DAG предназначена для сбоя по крайней мере одной задачи. В случае сбоя DAG запускается повторно. Я пытаюсь добиться того, чтобы экземпляр задачи не выполнял тяжелый код, если задача была успешно выполнена. Например, при повторном запуске DAG, если задача execute_query_heavy_code02 была успешно запущена (в тот же день), ее не следует запускать до следующего дня.

Ответ №1:

Быстрое решение (не лучшая практика, но достаточно хорошее) без изменения конструкции DAG состоит в том, чтобы иметь воздушный Variable поток с датой последнего успешного выполнения execute_query_heavy_code02 . Сначала вы получаете значение переменной и сравниваете его с сегодняшним днем, и если дата старше, чем сегодня, выполните тяжелый код и установите значение переменной с помощью set() функции.

Код будет выглядеть так:

 # import section
from airflow.models import Variable
from datetime import datetime

# inside the execute_query_heavy_code02 function
last_execution = Variable.get('heavy_task_last_execution_date')
date_format = '%Y-%m-%d'  # change as you want
last_execution_dt = datetime.strptime(last_execution, date_format).date()
today = datetime.utcnow().date()

if last_execution_dt == today:
    # continue with next task
else:
    # execute heavy code
    Variable.set('heavy_task_last_execution_date', today.strftime(date_format))
 

Если вы хотите изменить какую-то часть дизайна, которую вы можете использовать BranchPythonOperator (очень хорошо объяснено здесь).

Комментарии:

1. Я бы тоже согласился с этим. Но одна вещь, которую следует помнить, — это установить значение по умолчанию, если переменная не существует, чтобы не получать ошибок/предупреждений при первом запуске DAG. last_execution = Variable.get('heavy_task_last_execution_date', default_var='whatever_here')

2. Определенно @KarolosK.! Вы также можете установить это во время создания переменной. Что-то вроде 1950-01-01 или только вчера.