#python #airflow
#python #поток воздуха
Вопрос:
У меня есть dag, который будет запланирован каждый час. Допустим, 01:00, 02:00, 03:00. Допустим, выбрано 02:00, но если 01:00 запуск dag все еще продолжается, необходимо отменить экземпляр 02:00am.
Я пробую этот код.
local_tz = pendulum.timezone("America/Chicago")
default_args = {
'owner': 'airflow',
'depends_on_past': True,
'start_date': datetime(2020, 11, 15, tzinfo=local_tz),
'run_as_user': user_id
}
dag = DAG(os.path.basename(__file__).replace(".pyc", "").replace(".py", ""),
catchup=False,
max_active_runs=1
schedule_interval='0 * * * *', #schedule_interval='@hourly'
default_args=default_args
)
def check_prev_dag_run_status(**kwargs):
curr_dag_id = kwargs['dag'].dag_id
curr_task_id = kwargs['task'].task_id
newdate = kwargs['execution_date']
ti = TaskInstance(curr_dag_id, curr_task_id, newdate)
state = ti.current_state()
if state=="running":
raise ValueError("Not all previous tasks successfully completed")
check_success_task = PythonOperator(
task_id='check_status',
python_callable= check_prev_dag_run_status,
provide_context=True,
dag=dag
)
run_this_0 = BashOperator(
task_id='run_shell',
bash_command="ksh runshellscript.ksh",
execution_timeout=None,
dag=dag
)
Я получаю сообщение об ошибке, которое
[2020-11-17 12:30:07,337] {taskinstance.py:1150} ОШИБКА — объект ‘str’ не имеет атрибута ‘dag_id’
Трассировка (последний последний вызов): File «/airflow/bd/pyenv/pycdr/lib/python3.7/site-packages/airflow/models/taskinstance.py «, строка 984, в _run_raw_task результат = task_copy.execute(контекст=контекст)
Файл «/airflow/bd/pyenv/pycdr/lib/python3.7/site-packages/airflow/operators/python_operator.py «, строка 113, в execute return_value = self.execute_callable()
Файл «/airflow/bd/pyenv/pycdr/lib/python3.7/site-packages/airflow/operators/python_operator.py «, строка 118, в execute_ca
* Пожалуйста, предложите мне
- чего мне не хватает при передаче аргументов в airflow.models.taskinstance.TaskInstance
- Предоставляет ли execution_date непосредственный предыдущий экземпляр запуска dag? или как получить состояние выполнения предыдущего экземпляра базы данных*
Комментарии:
1. execution_date указывает дату выполнения текущего запуска DAG, а не предыдущего запуска, см. Также Планировщик потока данных
2. На самом деле я ищу то, чего мне не хватает при передаче аргументов в airflow.models.taskinstance. TaskInstance
Ответ №1:
airflow.models.taskinstance.TaskInstance
просто принимает два аргумента, task
а execution_date
не 3, как в вашем коде. Кроме task
того, это не та task_id
, а скорее определенная задача, в вашем примере это run_this_0
, я думаю. Вам нужно передать execution_date
значение последнего запуска задачи, а не текущего. Также статус может отличаться от запущенного и оставаться неудачным, поэтому я бы также изменил это.
Собрав все это вместе, следующий код должен быть в порядке, чтобы проверить, был ли run_this_0
предыдущий запуск DAG успешным:
def check_prev_dag_run_status(**kwargs):
newdate = kwargs['prev_execution_date']
ti = TaskInstance(run_this_0, newdate)
state = ti.current_state()
if state!="success":
raise ValueError("Not all previous tasks successfully completed")
Ответ №2:
Установите для этой задачи совпадение задач равным 1. Более новая задача не будет выполняться, если предыдущий запуск не был запущен.
Набор зависит от прошлого значения true. Недостатком этого является то, что при сбое одного пакета следующие пакеты не будут выполняться.
Используйте внешний датчик задач с перепланированием режима, чтобы дождаться завершения предыдущего пакета.