Как преобразовать данные перед загрузкой в BigQuery в Apache Airflow?

# #google-cloud-platform #google-bigquery #airflow

Вопрос:

Я новичок в Apache Airflow. Моя задача-прочитать данные из облачного хранилища Google, преобразовать их и загрузить преобразованные данные в таблицу BigQuery. Я могу получать данные из облачного хранилища и напрямую хранить их в таблице BigQuery. Я не уверен, как включить функцию преобразования в этот конвейер.

Вот мой код:

 # Import libraries needed for the operation
import airflow
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator

# Default Argument
default_args = {
    'owner': <OWNER_NAME>,
    'depends_on_past': False,
    'start_date': airflow.utils.dates.days_ago(1),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=2),
}

# DAG Definition
dag = DAG('load_from_bucket_to_bq',
schedule_interval='0 * * * *',
default_args=default_args)

# Variable Configurations
BQ_CONN_ID = <CONN_ID>
BQ_PROJECT = <PROJECT_ID>
BQ_DATASET = <DATASET_ID>

with dag:
    # Tasks
    start = DummyOperator(
        task_id='start'
    )

    upload = GoogleCloudStorageToBigQueryOperator(
        task_id='load_from_bucket_to_bigquery',
        bucket=<BUCKET_NAME>,
        source_objects=['*.csv'],
        schema_fields=[
            {'name': 'Active_Cases', 'type': 'STRING', 'mode': 'NULLABLE'},
            {'name': 'Country', 'type': 'STRING', 'mode': 'NULLABLE'},
            {'name': 'Last_Update', 'type': 'STRING', 'mode': 'NULLABLE'},
            {'name': 'New_Cases', 'type': 'STRING', 'mode': 'NULLABLE'},
            {'name': 'New_Deaths', 'type': 'STRING', 'mode': 'NULLABLE'},
            {'name': 'Total_Cases', 'type': 'STRING', 'mode': 'NULLABLE'},
            {'name': 'Total_Deaths', 'type': 'STRING', 'mode': 'NULLABLE'},
            {'name': 'Total_Recovered', 'type': 'STRING', 'mode': 'NULLABLE'},
        ],
        destination_project_dataset_table=BQ_PROJECT   '.'   BQ_DATASET   '.'   <TABLE_NAME>,
        write_disposition='WRITE_TRUNCATE',
        google_cloud_storage_conn_id=BQ_CONN_ID,
        bigquery_conn_id=BQ_CONN_ID,
        dag = dag
    )

    end = DummyOperator(
        task_id='end'
    )

    # Setting Dependencies
    start >> upload >> end

 

Мы будем признательны за любую помощь в том, как действовать дальше. Спасибо.

Комментарии:

1. Какого рода трансформацией вы будете заниматься?

2. Я получаю данные о случаях COVID за каждый день. Я хочу сделать разницу в случаях для каждого дня и сохранить это в таблице BigQuery.

3. Вы можете попробовать создать функцию python, которая выполняет преобразование, и использовать PythonOperator в вашей DAG для вызова функции во время выполнения.

4. Есть ли оператор для получения данных из облачного хранилища и использования их в функции Python? Я не мог его найти, поэтому я напрямую использовал GoogleCloudStorageToBigQueryОператор.

5. Является ли GCSToLocalFilesystemOperator единственным оператором, загружающим данные из корзины GCP?

Ответ №1:

Опубликуйте разговор с @sachinmb27 в качестве ответа. Преобразование может быть помещено в функцию python и использовать PythonOperator для вызова функции преобразования во время выполнения. Более подробную информацию о том, какие операторы могут использоваться в Airflow, можно найти в документах оператора воздушного потока.