Задержка датчика времени с интервалом расписания

#airflow-scheduler #airflow

#планировщик воздушного потока #воздушный поток

Вопрос:

У меня есть задание, которое выполняется в 13:30. Для выполнения первой задачи требуется почти 1 час, после чего нам нужно подождать 15 минут. Итак, я использую Timedeltasensor, как показано ниже.

 waitfor15min = TimeDeltaSensor(
        task_id='waitfor15min',
        delta=timedelta(minutes=15),
        dag=dag)

  

Однако в журналах отображается schedule_interval 15 минут, как показано ниже

 [2020-11-05 20:36:27,013] {time_delta_sensor.py:45} INFO - Checking if the time (2020-11-05T13:45:00 00:00) has come
[2020-11-05 20:36:27,013] {base_sensor_operator.py:79} INFO - Success criteria met. Exiting.
[2020-11-05 20:36:30,655] {logging_mixin.py:95} INFO - [2020-11-05 20:36:30,655] {jobs.py:2612} INFO - Task exited with return code 0
  

Как я могу создать задержку между заданиями??

Ответ №1:

Вы могли бы использовать PythonOperator и написать функцию, которая просто ждет 15 минут. Существует пример того, как может выглядеть задача ожидания:

 def my_sleeping_function(random_base, **kwargs)):
    """This is a function that will run within the DAG execution"""
    time.sleep(random_base)


# Generate 5 sleeping tasks, sleeping from 0.0 to 0.4 seconds respectively
for i in range(5):
    task = PythonOperator(
        task_id='sleep_for_'   str(i),
        python_callable=my_sleeping_function,
        op_kwargs={'random_base': float(i) / 10},
        provide context=true,
        dag=dag,
    )

    run_this >> task
  

Комментарии:

1. Привет @Philipp выше, функция python выдает ошибку. Пожалуйста, измените определение my_sleeping_function(random_base) на my_sleeping_function(random_base, ** kwargs)

2. Вышеуказанная функция должна просто выдавать ошибку, если у вас есть provide context=true в PythonOperator. Я все еще добавил оба. Пример был взят из официальной документации, на которую я ссылался.