#python #google-cloud-platform #airflow #directed-acyclic-graphs
#python #google-облачная платформа #воздушный поток #directed-acyclic-graphs
Вопрос:
Могу ли я каким-либо образом запланировать запуск DAG сразу после обновления таблицы Google?
Не уверен, получу ли я какой-либо ответ из этого документа: https://airflow.readthedocs.io/en/latest/_api/airflow/providers/google/suite/hooks/sheets/index.html
Ответ №1:
направление @ Alejandro правильное, но оно просто расширяет его ответ. Вы можете использовать оператор HttpSensor для выполнения запроса get к файлу таблицы с помощью API Google Drive
HttpSensor(
task_id='http_sensor_check',
http_conn_id='http_default',
endpoint='https://www.googleapis.com/drive/v3/files/fileId',
request_params={},
response_check=,
poke_interval=5,
dag=dag,
)
Теперь, согласно документации по возврату ответа, он должен возвращать modeifiedtime, который вы можете увидеть в ответе в response_check
response_check=lambda response: response.json()['modifiedTime'] > last_time_stored
Вы можете заменить это лямбда-выражение и извлечь значение из своей базы данных или кэша и т. Д.
Запуск сразу после:: Теперь вы можете использовать next operator в сочетании с этим датчиком для условного запуска.
Примечание: здесь poke_Interval зависит от варианта использования, от того, как часто вы хотите проверять наличие изменений.
Комментарии:
1. Я попробую и вернусь к вам, ребята! Спасибо! Очень полезно
2. В этом случае мне не нужно указывать schedule_interval в моем скрипте, верно?
3. под интервалом schudule вы имеете в виду poke_interval? если вам не нужно значение по умолчанию, тогда вам нужно. sensor запускает api через определенный интервал
4. Извините за глупый вопрос и выходящий за рамки, чтобы прочитать ответ, мне нужно получить доступ к API. Итак, я должен предоставить учетную запись службы для этих конкретных листов, чтобы иметь возможность читать лист, правильно?
Ответ №2:
Вы можете использовать HTTPOperator вместе с Google Drive API https://developers.google.com/drive/api/v3/reference/files/get
Вы также можете написать свою собственную реализацию, см. WebHDFSHook и WebHDFSSensor для справки
Комментарии:
1. То же, что и я, я думал о сенсорном подходе.
2. в контрибе такого датчика пока нет, также нет метода в написанном хуке. в этом случае лучше написать собственный датчик самостоятельно. лучшим способом будет расширить HTTPOperator с помощью подхода, изложенного ниже. Хорошей ссылкой для реализации будут также WebHDFSHook и WebHDFSSensor, оба из которых используют WebHDFS REST API
3. разве это не так airflow.readthedocs.io/en/latest/_api/airflow/providers/http /… ?
4. но это все равно приведет к тому, что ответ на ответ будет получен правильно? Выполняет оператор HTTP GET и возвращает False при сбое, вызванном 404 Not Found или response_check, возвращающим False . Коды ошибок HTTP, отличные от 404 (например, 403) или ошибка отказа в подключении, приведут к сбою самого датчика напрямую (больше не нужно тыкать).
5. Да, вы правы, но это будут крайние случаи, OP хотел проверить, изменены ли существующие файлы.