Как установить объект времени на SLA в воздушном потоке вместо timedelta?

#python #python-3.x #airflow #airflow-scheduler

Вопрос:

Я пытаюсь внедрить SLA в свой блок управления воздушным потоком.

Я знаю, как работают SLA, вы устанавливаете объект timedelta, и если задача не будет выполнена в течение этого времени, он отправит электронное письмо и уведомит, что задача еще не выполнена.

Мне нужна аналогичная функциональность, но вместо указания продолжительности я хочу установить определенное время в SLA. Например, если задание не выполнено до 8:00 утра, оно отправляет электронное письмо и уведомляет менеджера. Что-то вроде этого:

 'sla': time(hour=8, minute=0, second=0)
 

Я много искал, но ничего не нашел.

Есть ли какое-либо решение этой конкретной проблемы? или какие-либо другие решения, кроме SLA?

Заранее спасибо.

Ответ №1:

SLA парам BaseOperator ожидает datetime.timedelta объект, так что там больше нечего делать. Примите во внимание, что SLA это представляет собой временную разницу после окончания запланированного периода. Пример из документов предполагает, что DAG планируется ежедневно:

Например, если вы установите SLA на 1 час, планировщик отправит электронное письмо вскоре после 1:00 ночи на 2016-01-02, если экземпляр 2016-01-01 еще не удался.

Дело в том, что это всегда разница во времени с периодом расписания, а это не то, что вы ищете.

Поэтому я думаю, что вам следует использовать другой подход, например, планировать свою DAG всякий раз, когда вам это нужно, выполнять нужные задачи, а затем добавить оператора датчика, чтобы проверить, выполнено ли условие, которое вы ищете, или нет. Существует несколько типов датчиков, в зависимости от контекста, в котором вы находитесь, вы можете выбрать один из них.

Другим вариантом может быть, создать новый Даг , посвященный проверить, если задачи выполнены в оригинальном Даг были выполнены успешно или нет, и действовать соответствующим образом (например, отправки писем и т. д.). Для этого можно использовать ExternalTaskSensor , проверить в режиме онлайн-уроки о том, как реализовать это, хотя это может быть проще, чтобы избежать перекрестного Даг зависимости как указано в документации.

Надеюсь, что это может направить вас в правильном направлении.