#python #airflow
#python #воздушный поток
Вопрос:
Я пытаюсь опросить конечную точку HTML API на предмет новых данных и выполнить DAG только при наличии новых данных.
У меня есть датчик со стандартной функцией poke, которая вернет True
, если это так.
Мне интересно, возможно ли избежать запланированной задачи и выполнить задачу только тогда, когда датчик возвращает значение true? В настоящее время я запускаю DAG ежедневно, и время ожидания датчика истекает через 24 часа (таким образом, одновременно выполняется только один DAG). Однако, в случае, если новые данные поступают дважды в день, им придется подождать до обработки следующего запуска DAG.
Ответ №1:
Вы можете использовать 1 DAG, предназначенный для определения. И используйте другой dag для выполнения обработки.
Определение DAG:
датчик продолжает нажимать -> как только poke () вернет true, используйте TriggerDagRunOperator для запуска определения DAG -> используйте TriggerDagRunOperator для запуска обработки DAG
Обработка DAG:
обрабатывайте все, что хотите
Ответ №2:
Я делаю нечто подобное, я использую Python для мониторинга датчика вакуума и для включения или выключения различных вакуумных насосов в группе из 6 насосов. Очевидно, что я должен отслеживать датчик вакуума в максимально приближенном к реальному времени, насколько это практически возможно.
Я также не хотел загружать наш главный сервер. Итак, я подключил датчик к raspberry pi, и он запускает Python. Он имеет цикл, аналогичный (но немного более сложный, чем) следующему:
while True:
time.sleep(0.1)
bar = current_vacuum()
write_wtfii_log(bar)
......
В коде в write_wtfii_log(строка),
newdata = json.dumps({'bar': bar,
'pump_1': pump_status[1],
'pump_1_tor': pump_tor[1],
'pump_2': pump_status[2],
'pump_2_tor': pump_tor[2],
'pump_3': pump_status[3],
'pump_3_tor': pump_tor[3],
'pump_4': pump_status[4],
'pump_4_tor': pump_tor[4],
'pump_5': pump_status[5],
'pump_5_tor': pump_tor[5],
'pump_6': pump_status[6],
'pump_6_tor': pump_tor[6]})
url='http://10.0.0.178/flaskr/add_vacuum'
headers = {'Content-Type': 'application/json'}
try:
response = requests.post(url, data=newdata , headers=headers)
except:
print('Error trying to write log entry into wtfii')
pass
return()
Это означает, что мой главный сервер с удовольствием прослушивает новые показания, которые отправляются примерно каждую секунду. Главный сервер запускает flask с python sqlite3, где регистрируются показания. Вы могли бы закодировать это так, чтобы программы python отправляли только измененные показания вместо каждого считывания.
Raspberry pi, на котором выполняется этот python-код, имеет среднюю нагрузку около 0,02 и в настоящее время работает в течение 242 дней с момента последней перезагрузки.
Комментарии:
1. Я понимаю, что вы пытаетесь сделать, но я не уверен, как это связано с apache-airflow?
2. Я совершенно неправильно понял вопрос, я увидел воздушный поток и подумал, что это датчики давления воздуха, которые измеряются дважды в день. Упс!