#google-cloud-platform #google-bigquery #google-cloud-storage #airflow
#google-cloud-platform #google-bigquery #google-облачное хранилище #воздушный поток
Вопрос:
Я ежедневно запускаю задание airflow, которое выполняет запрос aa и сохраняет его в таблице в большом запросе, а затем в другом операторе, который копирует выходные данные в GCS.
Теперь я заметил, что, когда размер файла превышает 200 МБ, Airflow делит его на несколько файлов в GCS. Это нормальное поведение для этого оператора? Если да, то как мы можем его переопределить?
Я читал, что этот оператор завершается с ошибкой, когда размеры файлов также превышают 1 ГБ
Ниже приведен отображаемый код
start = DummyOperator(task_id="start")
output_table = f"test.{DESTINATION_DATASET}.test_{{{{ ts_nodash }}}}"
output_path = f"gs://test/test/{{{{ ts_nodash }}}}"
query = BigQueryOperator(
task_id=f"test1",
sql="sql/test.sql",
use_legacy_sql=False,
priority="BATCH",
write_disposition="WRITE_TRUNCATE",
destination_dataset_table=output_table,
params={
"event_dataset": EVENT_DATASET,
"days_to_query": 30,
},
bigquery_conn_id="google_cloud_default",
labels={"team": "test"},
dag=dag
)
export_to_gcs = BigQueryToCloudStorageOperator(
task_id=f"export_to_gcs",
source_project_dataset_table=output_table,
destination_cloud_storage_uris=[f"{output_path}/data-*.csv"],
compression="NONE",
export_format="CSV",
bigquery_conn_id="google_cloud_default",
labels={"team": "test"},
dag=dag,
)
start >> query >> export_to_gcs
Ответ №1:
Первый BigQueryToCloudStorageOperator
устарел. Вы должны использовать BigQueryToGCSOperator.
Да, это нормально, и на самом деле это не воздушный поток. Вот как работает BigQuery. Airflow не добавлял / не изменял функциональность BigQuery.
В частности, в вашем случае вы попросили BigQuery сгенерировать несколько файлов при установке нескольких URI в destination_cloud_storage_uris=[f"{output_path}/data-*.csv"],
Вы можете прочитать о различных вариантах destination_cloud_storage_uris
(Single URI, Single wildcard URI, Multiple wildcard URI) в документах BigQuery здесь . Вы также должны знать об ограничениях.
Предполагая, что вы хотите, чтобы на выходе был один файл, просто измените свой код на один URI:
destination_cloud_storage_uris=[f"{output_path}/data.csv"]