скопируйте файл из GCS на Google Диск через Airflow

#python #python-3.x #google-cloud-platform #airflow

#python #python-3.x #google-облачная платформа #воздушный поток

Вопрос:

Я хотел бы создать экземпляр задачи (через airflow), которая копирует файл из корзины в облачном хранилище Google на диск.

Я использую выделенный оператор, который находится в :

 from airflow.contrib.operators.gcs_to_gdrive_operator import GcsToGDriveOperator
  

затем оператор :

 copy_files = GcsToGDriveOperator(
        task_id="copy_files",
        source_bucket=GCS_BUCKET_ID,
        source_object='{}/{}/forecasted/*'.format(COUNTRY_TRIGRAM, PRED_START_RANGE),
        destination_object="content/drive/Shared Drives/FORECAST_TEST",
        gcp_conn_id="airflow_service_account_conn_w_drive"
    )
  

Задача выполнена успешно, но не копируйте файл в «целевой объект», который является частью, в которую я не уверен, что вставить.

Ответ №1:

Просматривая исходный код Airflow GcsToGDriveOperator, я предполагаю, что Airflow использует gcs_hook. метод download(), загружающий файлы из GCS и gdrive_hook.upload_file() загружает эти объекты в целевое местоположение Gdrive.

Учитывая сказанное выше, gcs_hook.download() метод записывает каждое действие для успешного результата операции:

 self.log.info('File downloaded to %s', filename)
  

Аналогично, gdrive_hook.upload_file() записывает каждую итерацию загрузки файла в виде сообщения журнала:

 self.log.info("File %s uploaded to gdrive://%s.", local_location, remote_location)
  

Несмотря на то, что задача выполнена успешно, я полагаю, что вы можете зафиксировать вышеупомянутые события в журналах Airflow в рамках конкретной задачи, ища фактические пути к источнику и месту назначения, полученные из GcsToGDriveOperator() определения.

Вы можете рассмотреть даже проверку журнала Airflow workers, подключившись к кластеру GKE и запустив kubectl инструмент командной строки:

 kubectl logs deployment/airflow-worker -n $(kubectl get ns| grep composer*| awk '{print $1}') -c airflow-worker | grep 'Executing copy'
  

Комментарии:

1. Я могу отслеживать журналы airflow, в которых говорится, что мой файл «/ tmp / tmpwj__bkqs » был загружен в gdrive:// > «содержимое / диск / Общие диски / FORECAST_TEST»/000000. это действительно путь, который я указал в объекте назначения. Но моя проблема заключается не в том, чтобы отслеживать, где находится мой файл, а в том, чтобы отправить файл в нужное место, чтобы указать правильный путь, я не нахожу свой скопированный файл там, где я хотел бы его найти, это означает, что я не указываю путь надлежащим образом, поэтому я ищу правильный «синтаксический» способ его указания

2. Эта тема может быть актуальна для вас. Помогает ли это вам получить правильный синтаксис пути?

3. Согласно этой теме, я думаю, что мой путь правильно определен, но он все равно не сработал, я думаю, что GcsToGDriveOperator указывает на привод воздушного потока вместо моего локального воздушного потока, но я не нашел ни одного аргумента, который говорил бы «это мой диск»

4. GcsToGDriveOperator фактически используется для получения файлов из корзины GCS, но не с локального диска.

5. О, я понимаю .. итак, я выбрал неправильный путь, вы знали другой способ переноса на локальный диск? Спасибо