Airflow BigQueryOperator: как сохранить выходные данные в указанный столбец раздела вместо времени приема

#airflow

#воздушный поток

Вопрос:

Мне нравится передавать имя столбца раздела в BigQueryOperator вместо использования таблиц с разделением по времени приема.

         bq_cmd = BigQueryOperator (
            task_id=                    "task_id",
            sql=                        [query],
            destination_dataset_table=  destination_tbl,
            use_legacy_sql=             False,
            write_disposition=          'WRITE_TRUNCATE',
            time_partitioning=          {'time_partitioning_type':'DAY','time_partitioning_field': 'batch_date'},
            allow_large_results=        True,
            trigger_rule=               'all_success',
            query_params=               query_params,
            dag=                        dag
        )
  

Я попробовал описанный выше способ, но он не сработал, и итоговая таблица была создана с использованием столбца раздела как _PARTITIONTIME вместо batch_date

Комментарии:

1. Destination_tbl уже создан (другими словами, схема установлена, и когда запускается оператор большого запроса, он удаляет все в таблице, но не создает новую таблицу) или это новая таблица при каждом запуске?

2. @При первом запуске создается таблица с разделом времени приема, а при втором запуске создается новый раздел и так далее

3. Любая помощь очень ценится

Ответ №1:

Вставка параметров в формате ниже помогает в создании таблицы разделов на основе настраиваемого поля

 bq_cmd = BigQueryOperator (
    task_id=                    'task_name',
    sql=                        [query],
    destination_dataset_table=  destination_tbl,
    use_legacy_sql=             False,
    write_disposition=          'WRITE_TRUNCATE',
    time_partitioning=          {'field': 'execution_date',
                                 'type' : 'DAY'},
    allow_large_results=        True,
    trigger_rule=               'all_success',
    query_params=               query_params,
    dag=                        dag
)
  

В моем случае это была execution_date, которая присутствовала во входной таблице.