# #google-cloud-platform #google-bigquery #airflow
Вопрос:
Я новичок в Apache Airflow. Моя задача-прочитать данные из облачного хранилища Google, преобразовать их и загрузить преобразованные данные в таблицу BigQuery. Я могу получать данные из облачного хранилища и напрямую хранить их в таблице BigQuery. Я не уверен, как включить функцию преобразования в этот конвейер.
Вот мой код:
# Import libraries needed for the operation
import airflow
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator
# Default Argument
default_args = {
'owner': <OWNER_NAME>,
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=2),
}
# DAG Definition
dag = DAG('load_from_bucket_to_bq',
schedule_interval='0 * * * *',
default_args=default_args)
# Variable Configurations
BQ_CONN_ID = <CONN_ID>
BQ_PROJECT = <PROJECT_ID>
BQ_DATASET = <DATASET_ID>
with dag:
# Tasks
start = DummyOperator(
task_id='start'
)
upload = GoogleCloudStorageToBigQueryOperator(
task_id='load_from_bucket_to_bigquery',
bucket=<BUCKET_NAME>,
source_objects=['*.csv'],
schema_fields=[
{'name': 'Active_Cases', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'Country', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'Last_Update', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'New_Cases', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'New_Deaths', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'Total_Cases', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'Total_Deaths', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'Total_Recovered', 'type': 'STRING', 'mode': 'NULLABLE'},
],
destination_project_dataset_table=BQ_PROJECT '.' BQ_DATASET '.' <TABLE_NAME>,
write_disposition='WRITE_TRUNCATE',
google_cloud_storage_conn_id=BQ_CONN_ID,
bigquery_conn_id=BQ_CONN_ID,
dag = dag
)
end = DummyOperator(
task_id='end'
)
# Setting Dependencies
start >> upload >> end
Мы будем признательны за любую помощь в том, как действовать дальше. Спасибо.
Комментарии:
1. Какого рода трансформацией вы будете заниматься?
2. Я получаю данные о случаях COVID за каждый день. Я хочу сделать разницу в случаях для каждого дня и сохранить это в таблице BigQuery.
3. Вы можете попробовать создать функцию python, которая выполняет преобразование, и использовать PythonOperator в вашей DAG для вызова функции во время выполнения.
4. Есть ли оператор для получения данных из облачного хранилища и использования их в функции Python? Я не мог его найти, поэтому я напрямую использовал GoogleCloudStorageToBigQueryОператор.
5. Является ли GCSToLocalFilesystemOperator единственным оператором, загружающим данные из корзины GCP?
Ответ №1:
Опубликуйте разговор с @sachinmb27 в качестве ответа. Преобразование может быть помещено в функцию python и использовать PythonOperator для вызова функции преобразования во время выполнения. Более подробную информацию о том, какие операторы могут использоваться в Airflow, можно найти в документах оператора воздушного потока.