Пропустить текущий запуск базы данных, если предыдущий запуск базы данных экземпляра находится в запущенном состоянии

#python #airflow

#python #поток воздуха

Вопрос:

У меня есть dag, который будет запланирован каждый час. Допустим, 01:00, 02:00, 03:00. Допустим, выбрано 02:00, но если 01:00 запуск dag все еще продолжается, необходимо отменить экземпляр 02:00am.

Я пробую этот код.

 local_tz = pendulum.timezone("America/Chicago")

default_args = {
    'owner': 'airflow',
    'depends_on_past': True,
    'start_date': datetime(2020, 11, 15, tzinfo=local_tz),
    'run_as_user': user_id
}

dag = DAG(os.path.basename(__file__).replace(".pyc", "").replace(".py", ""),
          catchup=False,
          max_active_runs=1
          schedule_interval='0 * * * *',  #schedule_interval='@hourly'
          default_args=default_args
)

def check_prev_dag_run_status(**kwargs):
    curr_dag_id = kwargs['dag'].dag_id
    curr_task_id = kwargs['task'].task_id
    newdate = kwargs['execution_date']
    ti = TaskInstance(curr_dag_id, curr_task_id, newdate)
    state = ti.current_state()
    if state=="running":
        raise ValueError("Not all previous tasks successfully completed")
        
check_success_task = PythonOperator(
    task_id='check_status',
    python_callable= check_prev_dag_run_status,
    provide_context=True,
    dag=dag
)

run_this_0 = BashOperator(
    task_id='run_shell',
        bash_command="ksh runshellscript.ksh",
        execution_timeout=None,
        dag=dag 
  )
  

Я получаю сообщение об ошибке, которое

[2020-11-17 12:30:07,337] {taskinstance.py:1150} ОШИБКА — объект ‘str’ не имеет атрибута ‘dag_id’

Трассировка (последний последний вызов): File «/airflow/bd/pyenv/pycdr/lib/python3.7/site-packages/airflow/models/taskinstance.py «, строка 984, в _run_raw_task результат = task_copy.execute(контекст=контекст)

Файл «/airflow/bd/pyenv/pycdr/lib/python3.7/site-packages/airflow/operators/python_operator.py «, строка 113, в execute return_value = self.execute_callable()

Файл «/airflow/bd/pyenv/pycdr/lib/python3.7/site-packages/airflow/operators/python_operator.py «, строка 118, в execute_ca

* Пожалуйста, предложите мне

  1. чего мне не хватает при передаче аргументов в airflow.models.taskinstance.TaskInstance
  2. Предоставляет ли execution_date непосредственный предыдущий экземпляр запуска dag? или как получить состояние выполнения предыдущего экземпляра базы данных*

Комментарии:

1. execution_date указывает дату выполнения текущего запуска DAG, а не предыдущего запуска, см. Также Планировщик потока данных

2. На самом деле я ищу то, чего мне не хватает при передаче аргументов в airflow.models.taskinstance. TaskInstance

Ответ №1:

airflow.models.taskinstance.TaskInstance просто принимает два аргумента, task а execution_date не 3, как в вашем коде. Кроме task того, это не та task_id , а скорее определенная задача, в вашем примере это run_this_0 , я думаю. Вам нужно передать execution_date значение последнего запуска задачи, а не текущего. Также статус может отличаться от запущенного и оставаться неудачным, поэтому я бы также изменил это.

Собрав все это вместе, следующий код должен быть в порядке, чтобы проверить, был ли run_this_0 предыдущий запуск DAG успешным:

 def check_prev_dag_run_status(**kwargs):
    newdate = kwargs['prev_execution_date']
    ti = TaskInstance(run_this_0, newdate)
    state = ti.current_state()
    if state!="success":
        raise ValueError("Not all previous tasks successfully completed")
  

Ответ №2:

Установите для этой задачи совпадение задач равным 1. Более новая задача не будет выполняться, если предыдущий запуск не был запущен.

Набор зависит от прошлого значения true. Недостатком этого является то, что при сбое одного пакета следующие пакеты не будут выполняться.

Используйте внешний датчик задач с перепланированием режима, чтобы дождаться завершения предыдущего пакета.