# #python #google-cloud-storage #airflow
Вопрос:
Я новичок в воздушном потоке. Я должен проверить, не перемещен ли файл, сгенерированный из DAG (eg: sample.txt)
, из корзины(в моем случае файл, который я сгенерировал, будет удален из корзины, когда его заберет другая система, и тогда в корзине не будет этого выходного файла. Может потребоваться несколько минут, чтобы файл был удален из корзины)
Как добавить задачу в тот же DAG, где она ожидает/завершает работу, пока файл не будет удален из корзины, и когда sample.txt файл удаляется, затем приступайте к выполнению следующей задачи.
Существует ли какой-либо оператор, удовлетворяющий вышеуказанным критериям? пожалуйста, пролейте немного света на то, как действовать дальше
Комментарии:
1. существует класс
Operators
calledSensors
, который вы , возможно, захотите использоватьS3Sensor
, он фактически проверяет, существует ли файл в определенном месте в s3 или нет. ваше требование на самом деле противоположно тому, что оно делает, но вы должны уметь использовать его для достижения того же самого.2. да, я столкнулся с этим оператором, и я хочу выполнить что-то противоположное тому, что делает датчик, как вы сказали, я хочу понять, как использовать
sensor
оператора, если его можно настроить для удовлетворения вышеуказанного требования. Как добавить условие для ожидания ‘sample.txt» файл, который нужно переместить из корзины, а затем перейти к следующей задаче. @АнандВидват3. вы можете расширить
S3KeySensor
и переопределитьexecute
метод для выполнения ваших требований. пожалуйста, ознакомьтесь с реализацией метода execute: airflow.apache.org/docs/apache-airflow/1.10.4/_modules/airflow/…4. поскольку вашим облачным провайдером является GCP, вам, возможно, также придется настроить реализацию функции poke, я не знаю, доступен ли датчик файлов для gcp или нет.
Ответ №1:
Вы можете создать пользовательский датчик на основе текущего датчика GCSObjectExistenceSensor
Модификация проста:
from airflow.providers.google.cloud.sensors.gcs import GCSObjectExistenceSensor
class GCSObjectNotExistenceSensor(GCSObjectExistenceSensor):
def poke(self, context: dict) -> bool:
self.log.info('Sensor checks if : %s, %s does not exist', self.bucket, self.object)
hook = GCSHook(
gcp_conn_id=self.google_cloud_conn_id,
delegate_to=self.delegate_to,
impersonation_chain=self.impersonation_chain,
)
return not hook.exists(self.bucket, self.object)
Затем используйте датчик GCSObjectNotExistenceSensor
в своем коде, например:
gcs_object_does_not_exists = GCSObjectNotExistenceSensor(
bucket=BUCKET_1,
object=PATH_TO__FILE,
mode='poke',
task_id="gcs_object_does_not_exists_task",
)
Датчик не позволит трубопроводу продолжаться до тех пор, пока объект PATH_TO__FILE
не будет удален.
Комментарии:
1. @RvR все датчики наследуются от
BaseSensorOperator
poke_interval
значения по умолчанию имеет. вы можете перезаписать это в датчике.2. Элад, добавьте свой комментарий выше к решению в качестве расширенного требования к вопросу.
Ответ №2:
Для выполнения этой задачи вы можете использовать PythonOperator воздушного потока. Сделайте Python вызываемым, постоянно нажимайте GCS и проверяйте, удален ли файл. Возврат из функции Python при удалении файла из GCS.
from airflow.operators.python_operator import PythonOperator
from google.cloud import storage
import google.auth
def check_file_in_gcs():
credentials, project = google.auth.default()
storage_client = storage.Client('your_Project_id', credentials=credentials)
name = 'sample.txt'
bucket_name = 'Your_Bucket_name'
bucket = storage_client.bucket(bucket_name)
while True:
stats = storage.Blob(bucket=bucket, name=name).exists(storage_client)
if not stats:
print("Returning as file is removed!!!!")
return
check_gcs_file_removal = PythonOperator(
task_id='check_gcs_file_removal',
python_callable= check_file_in_gcs,
#op_kwargs={'params': xyz},
#Pass bucket name and other details if needed by commentating above
dag=dag
)
возможно, вам потребуется установить пакеты Python для работы облачных библиотек Google. Пожалуйста, установите один из них снизу. (Не уверен, какой именно из них установить.Взято из моего virtualenv)
google-api-core==1.16.0
google-api-python-client==1.8.0
google-auth==1.12.0
google-auth-httplib2==0.0.3
google-auth-oauthlib==0.4.1
google-cloud-core==1.3.0
google-cloud-storage==1.27.0
Комментарии:
1. Это хорошо работает, если ваша задача должна выполняться только в течение небольшого промежутка времени, скажем, пару минут, но если задача должна выполняться в течение нескольких часов, то рабочий узел будет очень занят, поэтому такой подход не рекомендуется