#python #pandas #google-bigquery
#python #pandas #google-bigquery
Вопрос:
Я пытаюсь загрузить данные из MYSQL в BigQuery. Я использую pandas, jaydebeapi и load_table_from_dataframe. При использовании того же самого, получаю ошибку ниже:
>>> job = client.load_table_from_dataframe(chunk, table_id)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/home/aarpan_roy/.local/lib/python2.7/site-packages/google/cloud/bigquery/client.py", line 1993, in load_table_from_dataframe
parquet_compression=parquet_compression,
File "/home/aarpan_roy/.local/lib/python2.7/site-packages/google/cloud/bigquery/_pandas_helpers.py", line 486, in dataframe_to_parquet
arrow_table = dataframe_to_arrow(dataframe, bq_schema)
File "/home/aarpan_roy/.local/lib/python2.7/site-packages/google/cloud/bigquery/_pandas_helpers.py", line 450, in dataframe_to_arrow
bq_to_arrow_array(get_column_or_index(dataframe, bq_field.name), bq_field)
File "/home/aarpan_roy/.local/lib/python2.7/site-packages/google/cloud/bigquery/_pandas_helpers.py", line 224, in bq_to_arrow_array
return pyarrow.Array.from_pandas(series, type=arrow_type)
File "pyarrow/array.pxi", line 755, in pyarrow.lib.Array.from_pandas
File "pyarrow/array.pxi", line 265, in pyarrow.lib.array
File "pyarrow/array.pxi", line 80, in pyarrow.lib._ndarray_to_array
TypeError: an integer is required
>>>
Пара моментов:
-
Моя исходная таблица существует и имеет схему ниже:
EMPID INTEGER, EMPNAME VARCHAR, STREETADRESS VARCHAR, REGION VARCHAR, STATE VARCHAR, COUNTRY VARCHAR, joining_date date, last_update_date TIMESTAMP(6) -- to hold till millisecond
-
Моя целевая таблица также существует в BigQuery, и ниже приведена схема:
create table if not exists `Project.dataset.table_name` (EMPID INT64, EMPNAME STRING, STREETADRESS STRING, REGION STRING, STATE STRING, COUNTRY STRING, joining_date DATE, last_update_date TIMESTAMP );
-
Ниже приведен код, который я использую:
import datetime from google.cloud import bigquery import pandas as pd import jaydebeapi import os client = bigquery.Client() table_id = "<project_id>.<dataset>.<target_table>" database_host='<IP Address>' database_user='<user id>' database_password='<password>' database_port='<port>' database_db='<database_name>' jclassname = "com.mysql.jdbc.Driver" url = "jdbc:mysql://{host}:{port}/{database}".format(host=database_host, port=database_port, database=database_db) driver_args = [database_user, database_password] jars = ["/<Home_Dir>/script/jars/mysql-connector-java-5.1.45.jar"] libs = None cnx = jaydebeapi.connect(jclassname, url, driver_args, jars=jars, libs=libs) query='select EMPID,EMPNAME,STREETADRESS,REGION,STATE,COUNTRY,joining_date,last_update_date from <table_name>' cursor = cnx.cursor() for chunk in pd.read_sql(query, cnx, coerce_float=True, params=None, parse_dates=None, columns=None,chunksize=500000):chunk.apply(lambda x: x.replace(u'r', u' ').replace(u'n', u' ') if isinstance(x, str) or isinstance(x, unicode) else x) job = client.load_table_from_dataframe(chunk, table_id) job.result()
Пожалуйста, помогите мне решить проблему. Я также пытался использовать LoadJobConfig, но появляется та же ошибка.
Комментарии:
1. Исправьте отступ и инструкции в
for chunk in pd.read_sql(...)
блоке.2. Было бы полезно, если бы вы напечатали схему результирующих фрагментов pandas. Ошибка, похоже, указывает на какое-то несоответствие схемы. Также какую версию Arrow и BQ-клиента вы используете?
3. Привет, я запускаю это из командной строки python. Вывод запроса приведен ниже: >>> EMPID EMPNAME STREETADRESS REGION STATE COUNTRY joining_date last_update_date 0 168050 Arpan Roy A 3/3 Ананданагар; Дакшин-Бехала-Роуд, Бехала, Западная Бенгалия, Индия. 2005-11-17 2020-08-23 08:57:30 ========== PyArrow ========== numpy>= 1.14 enum34> = 1.1.6 six> = 1.0.0 Как я могу проверить версию клиента BQ?
4. Всем привет, я нашел проблему. Для столбцов ДАТЫ и ВРЕМЕННОЙ МЕТКИ нам нужно использовать parse_dates=[‘joining_date’, ‘last_update_date’] в pd.read_sql: targettabledttscols=[‘joining_date’, ‘last_update_date’] для фрагмента в pd.read_sql(запрос, cnx, coerce_float= True, параметры = None, parse_dates=targettabledttscols, столбцы =Нет,размер фрагмента = 500000):chunk.apply(лямбда x: x.replace(u ‘ r’, u»).replace(u ‘ n’, u»), если isinstance(x, str) или isinstance(x, unicode) иначе x) Теперь мне нужно выяснить, почему миллисекунда не записывается в «фрагмент» в pd.read_sql. Кто-нибудь сталкивался с подобной проблемой и получил какое-либо решение? Помогите, пожалуйста
Ответ №1:
Это тот же код, который вы скопировали? я имею в виду, что насчет инструкций indents?
for chunk in pd.read_sql(query, cnx, coerce_float=True, params=None, parse_dates=None, columns=None,chunksize=500000):
chunk.apply(lambda x: x.replace(u'r', u' ').replace(u'n', u' ') if isinstance(x, str) or isinstance(x, unicode) else x)
job = client.load_table_from_dataframe(chunk, table_id)
job.result()
Исправьте приведенную выше строку.
Комментарии:
1. Привет, я запускаю его из командной строки Python, и, следовательно, отступ такой. Я получаю правильный вывод из фрейма данных. Упомянутый вами отступ — это реальный код python. >>> EMPID EMPNAME STREETADRESS REGION STATE COUNTRY дата присоединения last_update_date 0 168050 Арпан Рой А 3/3 Ананданагар; Дакшин Бехала Роуд Бехала Западная Бенгалия Индия 2005-11-17 2020-08-23 08:57:30