Воздушный поток — оператор bigquery_to_gcs с множественным назначением вывода

#google-cloud-platform #google-bigquery #google-cloud-storage #airflow

#google-cloud-platform #google-bigquery #google-облачное хранилище #воздушный поток

Вопрос:

Я ежедневно запускаю задание airflow, которое выполняет запрос aa и сохраняет его в таблице в большом запросе, а затем в другом операторе, который копирует выходные данные в GCS.

Теперь я заметил, что, когда размер файла превышает 200 МБ, Airflow делит его на несколько файлов в GCS. Это нормальное поведение для этого оператора? Если да, то как мы можем его переопределить?

Я читал, что этот оператор завершается с ошибкой, когда размеры файлов также превышают 1 ГБ

Ниже приведен отображаемый код

 start = DummyOperator(task_id="start")

output_table = f"test.{DESTINATION_DATASET}.test_{{{{ ts_nodash }}}}"
output_path = f"gs://test/test/{{{{ ts_nodash }}}}"

query = BigQueryOperator(
    task_id=f"test1",
    sql="sql/test.sql",
    use_legacy_sql=False,
    priority="BATCH",
    write_disposition="WRITE_TRUNCATE",
    destination_dataset_table=output_table,
    params={
        "event_dataset": EVENT_DATASET,
        "days_to_query": 30,
    },
    bigquery_conn_id="google_cloud_default",
    labels={"team": "test"},
    dag=dag
)
export_to_gcs = BigQueryToCloudStorageOperator(
    task_id=f"export_to_gcs",
    source_project_dataset_table=output_table,
    destination_cloud_storage_uris=[f"{output_path}/data-*.csv"],
    compression="NONE",
    export_format="CSV",
    bigquery_conn_id="google_cloud_default",
    labels={"team": "test"},
    dag=dag,
)

start >> query >> export_to_gcs

 

Ответ №1:

Первый BigQueryToCloudStorageOperator устарел. Вы должны использовать BigQueryToGCSOperator.

Да, это нормально, и на самом деле это не воздушный поток. Вот как работает BigQuery. Airflow не добавлял / не изменял функциональность BigQuery.

В частности, в вашем случае вы попросили BigQuery сгенерировать несколько файлов при установке нескольких URI в destination_cloud_storage_uris=[f"{output_path}/data-*.csv"],

Вы можете прочитать о различных вариантах destination_cloud_storage_uris (Single URI, Single wildcard URI, Multiple wildcard URI) в документах BigQuery здесь . Вы также должны знать об ограничениях.

Предполагая, что вы хотите, чтобы на выходе был один файл, просто измените свой код на один URI:

 destination_cloud_storage_uris=[f"{output_path}/data.csv"]