Как пометить PostgresOperator DAG как сбой при условии?

#postgresql #airflow #airflow-scheduler

#postgresql #поток воздуха #воздушный поток-планировщик

Вопрос:

У меня ежедневно в определенный час запускается база данных Airflow. Это оператор PG и извлекает таблицу. например, следующее:

 select count(*) from my_table where date_insert='{{ds}}'
 

Теперь мне нужно пометить эту задачу как неудачную, если это значение равно нулю. Итак, я предполагаю, что мне нужно вернуть данные из базы данных в Python.

Ответ №1:

Это вариант использования SQLCheckOperator, который совместим с PostgreSQL:

 from airflow.operators.sql import SQLCheckOperator
SQLCheckOperator(
    conn_id='your_conn',
    sql="""select count(*) from my_table where date_insert='{{ds}}'""",
    
)
 

Оператор вызовет AirflowException if count==0 .

Комментарии:

1. Работает как шарм! Спасибо!