Airflow запускает DAG в любое время после обновления таблицы Google

#python #google-cloud-platform #airflow #directed-acyclic-graphs

#python #google-облачная платформа #воздушный поток #directed-acyclic-graphs

Вопрос:

Могу ли я каким-либо образом запланировать запуск DAG сразу после обновления таблицы Google?

Не уверен, получу ли я какой-либо ответ из этого документа: https://airflow.readthedocs.io/en/latest/_api/airflow/providers/google/suite/hooks/sheets/index.html

Ответ №1:

направление @ Alejandro правильное, но оно просто расширяет его ответ. Вы можете использовать оператор HttpSensor для выполнения запроса get к файлу таблицы с помощью API Google Drive

 HttpSensor(
    task_id='http_sensor_check',
    http_conn_id='http_default',
    endpoint='https://www.googleapis.com/drive/v3/files/fileId',
    request_params={},
    response_check=,
    poke_interval=5,
    dag=dag,
)
  

Теперь, согласно документации по возврату ответа, он должен возвращать modeifiedtime, который вы можете увидеть в ответе в response_check

 response_check=lambda response: response.json()['modifiedTime'] > last_time_stored
  

Вы можете заменить это лямбда-выражение и извлечь значение из своей базы данных или кэша и т. Д.

Запуск сразу после:: Теперь вы можете использовать next operator в сочетании с этим датчиком для условного запуска.

Примечание: здесь poke_Interval зависит от варианта использования, от того, как часто вы хотите проверять наличие изменений.

Комментарии:

1. Я попробую и вернусь к вам, ребята! Спасибо! Очень полезно

2. В этом случае мне не нужно указывать schedule_interval в моем скрипте, верно?

3. под интервалом schudule вы имеете в виду poke_interval? если вам не нужно значение по умолчанию, тогда вам нужно. sensor запускает api через определенный интервал

4. Извините за глупый вопрос и выходящий за рамки, чтобы прочитать ответ, мне нужно получить доступ к API. Итак, я должен предоставить учетную запись службы для этих конкретных листов, чтобы иметь возможность читать лист, правильно?

Ответ №2:

Вы можете использовать HTTPOperator вместе с Google Drive API https://developers.google.com/drive/api/v3/reference/files/get

Вы также можете написать свою собственную реализацию, см. WebHDFSHook и WebHDFSSensor для справки

Комментарии:

1. То же, что и я, я думал о сенсорном подходе.

2. в контрибе такого датчика пока нет, также нет метода в написанном хуке. в этом случае лучше написать собственный датчик самостоятельно. лучшим способом будет расширить HTTPOperator с помощью подхода, изложенного ниже. Хорошей ссылкой для реализации будут также WebHDFSHook и WebHDFSSensor, оба из которых используют WebHDFS REST API

3. разве это не так airflow.readthedocs.io/en/latest/_api/airflow/providers/http /… ?

4. но это все равно приведет к тому, что ответ на ответ будет получен правильно? Выполняет оператор HTTP GET и возвращает False при сбое, вызванном 404 Not Found или response_check, возвращающим False . Коды ошибок HTTP, отличные от 404 (например, 403) или ошибка отказа в подключении, приведут к сбою самого датчика напрямую (больше не нужно тыкать).

5. Да, вы правы, но это будут крайние случаи, OP хотел проверить, изменены ли существующие файлы.