Использование датчика воздушного потока для запуска DAG run

#python #airflow

#python #воздушный поток

Вопрос:

Я пытаюсь опросить конечную точку HTML API на предмет новых данных и выполнить DAG только при наличии новых данных.

У меня есть датчик со стандартной функцией poke, которая вернет True , если это так.

Мне интересно, возможно ли избежать запланированной задачи и выполнить задачу только тогда, когда датчик возвращает значение true? В настоящее время я запускаю DAG ежедневно, и время ожидания датчика истекает через 24 часа (таким образом, одновременно выполняется только один DAG). Однако, в случае, если новые данные поступают дважды в день, им придется подождать до обработки следующего запуска DAG.

Ответ №1:

Вы можете использовать 1 DAG, предназначенный для определения. И используйте другой dag для выполнения обработки.


Определение DAG:

датчик продолжает нажимать -> как только poke () вернет true, используйте TriggerDagRunOperator для запуска определения DAG -> используйте TriggerDagRunOperator для запуска обработки DAG


Обработка DAG:

обрабатывайте все, что хотите

Ответ №2:

Я делаю нечто подобное, я использую Python для мониторинга датчика вакуума и для включения или выключения различных вакуумных насосов в группе из 6 насосов. Очевидно, что я должен отслеживать датчик вакуума в максимально приближенном к реальному времени, насколько это практически возможно.

Я также не хотел загружать наш главный сервер. Итак, я подключил датчик к raspberry pi, и он запускает Python. Он имеет цикл, аналогичный (но немного более сложный, чем) следующему:

     while True:
        time.sleep(0.1)
        bar = current_vacuum()
        write_wtfii_log(bar)
        ......
  

В коде в write_wtfii_log(строка),

      newdata = json.dumps({'bar': bar,
                        'pump_1': pump_status[1],
                        'pump_1_tor': pump_tor[1],
                        'pump_2': pump_status[2],
                        'pump_2_tor': pump_tor[2],
                        'pump_3': pump_status[3],
                        'pump_3_tor': pump_tor[3],
                        'pump_4': pump_status[4],
                        'pump_4_tor': pump_tor[4],
                        'pump_5': pump_status[5],
                        'pump_5_tor': pump_tor[5],
                        'pump_6': pump_status[6],
                        'pump_6_tor': pump_tor[6]})
url='http://10.0.0.178/flaskr/add_vacuum'
headers = {'Content-Type': 'application/json'}
try:
   response = requests.post(url, data=newdata , headers=headers)
except:
   print('Error trying to write log entry into wtfii')
   pass
return()
  

Это означает, что мой главный сервер с удовольствием прослушивает новые показания, которые отправляются примерно каждую секунду. Главный сервер запускает flask с python sqlite3, где регистрируются показания. Вы могли бы закодировать это так, чтобы программы python отправляли только измененные показания вместо каждого считывания.

Raspberry pi, на котором выполняется этот python-код, имеет среднюю нагрузку около 0,02 и в настоящее время работает в течение 242 дней с момента последней перезагрузки.

Комментарии:

1. Я понимаю, что вы пытаетесь сделать, но я не уверен, как это связано с apache-airflow?

2. Я совершенно неправильно понял вопрос, я увидел воздушный поток и подумал, что это датчики давления воздуха, которые измеряются дважды в день. Упс!