Как загрузить данные в формате JSON в таблицу BigQuery с помощью Python в Airflow

# #python #google-bigquery #airflow

Вопрос:

Я пытаюсь добавить данные в таблицу bigquery, используя оператор python в airflow. Когда задача выполняется, я получаю ошибки ниже

ОШИБКА — Не удалось сериализовать значение XCom в JSON. Если вы используете pickles вместо JSON для XCom, вам необходимо включить поддержку pickle для XCom в конфигурации воздушного потока.

ОШИБКА — Объект типа «LoadJob» не сериализуется в формате JSON

Код на Python

 from google.cloud import bigquery
import os

def load_pg_data_to_bq(**kwargs):
    with open("dags/prod_data_sync_bucket/airtimes_.json", 'r', encoding='utf-8') as json_data:
        json_object = json.load(json_data)

    os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = r"dagskeysservice_key.json"

    ### Converts schema to BigQuery's expected format, 
    table_schema = [
        bigquery.SchemaField("id", "STRING", mode="NULLABLE"),
        bigquery.SchemaField("phone_number", "STRING", mode="NULLABLE"),
        bigquery.SchemaField("request_id", "STRING", mode="NULLABLE"),
        bigquery.SchemaField("created_at", "DATETIME", mode="NULLABLE"),
        bigquery.SchemaField("updated_at", "DATETIME", mode="NULLABLE"),
        bigquery.SchemaField("amount", "FLOAT64", mode="NULLABLE"),
        bigquery.SchemaField("bq_updatetime", "DATETIME", mode="NULLABLE")
        ]

    project_id = <MY_PROJECT_ID>
    dataset_id = <MY_DATASET_ID>
    table_id = <MY_TABLE_ID>

    client  = bigquery.Client(project = project_id)
    dataset  = client.dataset(dataset_id)
    table = dataset.table(table_id)

    job_config = bigquery.LoadJobConfig(
        write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
        source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
        schema=table_schema,
    )
    job = client.load_table_from_json(json_object, table, job_config = job_config)

    print(job.result())
    return job.result()
 

Чего мне не хватает?

Комментарии:

1. Я попытался запустить ваш код в Airflow и получил ту же ошибку, о которой вы упомянули в вопросе. Чтобы устранить проблему, я попытался запустить тот же код с помощью клиентской библиотеки BigQuery за пределами среды Airflow, и он успешно работал. Таким образом, проблема заключается в стороне воздушного потока. Пожалуйста, создайте проблему на GitHub в репозитории Airflow, объяснив проблему.