добавьте задержку для задачи до тех пор, пока конкретный файл не будет перемещен из корзины

# #python #google-cloud-storage #airflow

Вопрос:

Я новичок в воздушном потоке. Я должен проверить, не перемещен ли файл, сгенерированный из DAG (eg: sample.txt) , из корзины(в моем случае файл, который я сгенерировал, будет удален из корзины, когда его заберет другая система, и тогда в корзине не будет этого выходного файла. Может потребоваться несколько минут, чтобы файл был удален из корзины)

Как добавить задачу в тот же DAG, где она ожидает/завершает работу, пока файл не будет удален из корзины, и когда sample.txt файл удаляется, затем приступайте к выполнению следующей задачи.

Существует ли какой-либо оператор, удовлетворяющий вышеуказанным критериям? пожалуйста, пролейте немного света на то, как действовать дальше

Комментарии:

1. существует класс Operators called Sensors , который вы , возможно, захотите использовать S3Sensor , он фактически проверяет, существует ли файл в определенном месте в s3 или нет. ваше требование на самом деле противоположно тому, что оно делает, но вы должны уметь использовать его для достижения того же самого.

2. да, я столкнулся с этим оператором, и я хочу выполнить что-то противоположное тому, что делает датчик, как вы сказали, я хочу понять, как использовать sensor оператора, если его можно настроить для удовлетворения вышеуказанного требования. Как добавить условие для ожидания ‘sample.txt» файл, который нужно переместить из корзины, а затем перейти к следующей задаче. @АнандВидват

3. вы можете расширить S3KeySensor и переопределить execute метод для выполнения ваших требований. пожалуйста, ознакомьтесь с реализацией метода execute: airflow.apache.org/docs/apache-airflow/1.10.4/_modules/airflow/…

4. поскольку вашим облачным провайдером является GCP, вам, возможно, также придется настроить реализацию функции poke, я не знаю, доступен ли датчик файлов для gcp или нет.

Ответ №1:

Вы можете создать пользовательский датчик на основе текущего датчика GCSObjectExistenceSensor

Модификация проста:

 from airflow.providers.google.cloud.sensors.gcs import GCSObjectExistenceSensor
class GCSObjectNotExistenceSensor(GCSObjectExistenceSensor):

    def poke(self, context: dict) -> bool:
        self.log.info('Sensor checks if : %s, %s does not exist', self.bucket, self.object)
        hook = GCSHook(
            gcp_conn_id=self.google_cloud_conn_id,
            delegate_to=self.delegate_to,
            impersonation_chain=self.impersonation_chain,
        )
        return not hook.exists(self.bucket, self.object)
    
 

Затем используйте датчик GCSObjectNotExistenceSensor в своем коде, например:

 gcs_object_does_not_exists = GCSObjectNotExistenceSensor(
    bucket=BUCKET_1,
    object=PATH_TO__FILE,
    mode='poke',
    task_id="gcs_object_does_not_exists_task",
)
 

Датчик не позволит трубопроводу продолжаться до тех пор, пока объект PATH_TO__FILE не будет удален.

Комментарии:

1. @RvR все датчики наследуются от BaseSensorOperator poke_interval значения по умолчанию имеет. вы можете перезаписать это в датчике.

2. Элад, добавьте свой комментарий выше к решению в качестве расширенного требования к вопросу.

Ответ №2:

Для выполнения этой задачи вы можете использовать PythonOperator воздушного потока. Сделайте Python вызываемым, постоянно нажимайте GCS и проверяйте, удален ли файл. Возврат из функции Python при удалении файла из GCS.

 from airflow.operators.python_operator import PythonOperator
from google.cloud import storage
import google.auth

def check_file_in_gcs():
    credentials, project = google.auth.default()
    storage_client = storage.Client('your_Project_id', credentials=credentials)
    name = 'sample.txt'   
    bucket_name = 'Your_Bucket_name'
    bucket = storage_client.bucket(bucket_name)
    while True:
        stats = storage.Blob(bucket=bucket, name=name).exists(storage_client)
        if not stats:
           print("Returning as file is removed!!!!")
           return

check_gcs_file_removal = PythonOperator(
            task_id='check_gcs_file_removal',
            python_callable= check_file_in_gcs,
            #op_kwargs={'params': xyz},
            #Pass bucket name and other details if needed by commentating above 
            dag=dag
        )
 

возможно, вам потребуется установить пакеты Python для работы облачных библиотек Google. Пожалуйста, установите один из них снизу. (Не уверен, какой именно из них установить.Взято из моего virtualenv)

 google-api-core==1.16.0
google-api-python-client==1.8.0
google-auth==1.12.0
google-auth-httplib2==0.0.3
google-auth-oauthlib==0.4.1
google-cloud-core==1.3.0
google-cloud-storage==1.27.0
 

Комментарии:

1. Это хорошо работает, если ваша задача должна выполняться только в течение небольшого промежутка времени, скажем, пару минут, но если задача должна выполняться в течение нескольких часов, то рабочий узел будет очень занят, поэтому такой подход не рекомендуется