#airflow-scheduler #airflow
#планировщик воздушного потока #воздушный поток
Вопрос:
У меня есть задание, которое выполняется в 13:30. Для выполнения первой задачи требуется почти 1 час, после чего нам нужно подождать 15 минут. Итак, я использую Timedeltasensor, как показано ниже.
waitfor15min = TimeDeltaSensor(
task_id='waitfor15min',
delta=timedelta(minutes=15),
dag=dag)
Однако в журналах отображается schedule_interval 15 минут, как показано ниже
[2020-11-05 20:36:27,013] {time_delta_sensor.py:45} INFO - Checking if the time (2020-11-05T13:45:00 00:00) has come
[2020-11-05 20:36:27,013] {base_sensor_operator.py:79} INFO - Success criteria met. Exiting.
[2020-11-05 20:36:30,655] {logging_mixin.py:95} INFO - [2020-11-05 20:36:30,655] {jobs.py:2612} INFO - Task exited with return code 0
Как я могу создать задержку между заданиями??
Ответ №1:
Вы могли бы использовать PythonOperator
и написать функцию, которая просто ждет 15 минут. Существует пример того, как может выглядеть задача ожидания:
def my_sleeping_function(random_base, **kwargs)):
"""This is a function that will run within the DAG execution"""
time.sleep(random_base)
# Generate 5 sleeping tasks, sleeping from 0.0 to 0.4 seconds respectively
for i in range(5):
task = PythonOperator(
task_id='sleep_for_' str(i),
python_callable=my_sleeping_function,
op_kwargs={'random_base': float(i) / 10},
provide context=true,
dag=dag,
)
run_this >> task
Комментарии:
1. Привет @Philipp выше, функция python выдает ошибку. Пожалуйста, измените определение my_sleeping_function(random_base) на my_sleeping_function(random_base, ** kwargs)
2. Вышеуказанная функция должна просто выдавать ошибку, если у вас есть
provide context=true
в PythonOperator. Я все еще добавил оба. Пример был взят из официальной документации, на которую я ссылался.