# #python #google-bigquery #airflow
Вопрос:
Я пытаюсь добавить данные в таблицу bigquery, используя оператор python в airflow. Когда задача выполняется, я получаю ошибки ниже
ОШИБКА — Не удалось сериализовать значение XCom в JSON. Если вы используете pickles вместо JSON для XCom, вам необходимо включить поддержку pickle для XCom в конфигурации воздушного потока.
ОШИБКА — Объект типа «LoadJob» не сериализуется в формате JSON
Код на Python
from google.cloud import bigquery
import os
def load_pg_data_to_bq(**kwargs):
with open("dags/prod_data_sync_bucket/airtimes_.json", 'r', encoding='utf-8') as json_data:
json_object = json.load(json_data)
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = r"dagskeysservice_key.json"
### Converts schema to BigQuery's expected format,
table_schema = [
bigquery.SchemaField("id", "STRING", mode="NULLABLE"),
bigquery.SchemaField("phone_number", "STRING", mode="NULLABLE"),
bigquery.SchemaField("request_id", "STRING", mode="NULLABLE"),
bigquery.SchemaField("created_at", "DATETIME", mode="NULLABLE"),
bigquery.SchemaField("updated_at", "DATETIME", mode="NULLABLE"),
bigquery.SchemaField("amount", "FLOAT64", mode="NULLABLE"),
bigquery.SchemaField("bq_updatetime", "DATETIME", mode="NULLABLE")
]
project_id = <MY_PROJECT_ID>
dataset_id = <MY_DATASET_ID>
table_id = <MY_TABLE_ID>
client = bigquery.Client(project = project_id)
dataset = client.dataset(dataset_id)
table = dataset.table(table_id)
job_config = bigquery.LoadJobConfig(
write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
schema=table_schema,
)
job = client.load_table_from_json(json_object, table, job_config = job_config)
print(job.result())
return job.result()
Чего мне не хватает?
Комментарии:
1. Я попытался запустить ваш код в Airflow и получил ту же ошибку, о которой вы упомянули в вопросе. Чтобы устранить проблему, я попытался запустить тот же код с помощью клиентской библиотеки BigQuery за пределами среды Airflow, и он успешно работал. Таким образом, проблема заключается в стороне воздушного потока. Пожалуйста, создайте проблему на GitHub в репозитории Airflow, объяснив проблему.