Ошибка при использовании load_table_from_dataframe

#python #pandas #google-bigquery

#python #pandas #google-bigquery

Вопрос:

Я пытаюсь загрузить данные из MYSQL в BigQuery. Я использую pandas, jaydebeapi и load_table_from_dataframe. При использовании того же самого, получаю ошибку ниже:

 >>> job = client.load_table_from_dataframe(chunk, table_id)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/aarpan_roy/.local/lib/python2.7/site-packages/google/cloud/bigquery/client.py", line 1993, in load_table_from_dataframe
    parquet_compression=parquet_compression,
  File "/home/aarpan_roy/.local/lib/python2.7/site-packages/google/cloud/bigquery/_pandas_helpers.py", line 486, in dataframe_to_parquet
    arrow_table = dataframe_to_arrow(dataframe, bq_schema)
  File "/home/aarpan_roy/.local/lib/python2.7/site-packages/google/cloud/bigquery/_pandas_helpers.py", line 450, in dataframe_to_arrow
    bq_to_arrow_array(get_column_or_index(dataframe, bq_field.name), bq_field)
  File "/home/aarpan_roy/.local/lib/python2.7/site-packages/google/cloud/bigquery/_pandas_helpers.py", line 224, in bq_to_arrow_array
    return pyarrow.Array.from_pandas(series, type=arrow_type)
  File "pyarrow/array.pxi", line 755, in pyarrow.lib.Array.from_pandas
  File "pyarrow/array.pxi", line 265, in pyarrow.lib.array
  File "pyarrow/array.pxi", line 80, in pyarrow.lib._ndarray_to_array
TypeError: an integer is required
>>> 
  

Пара моментов:

  1. Моя исходная таблица существует и имеет схему ниже:

     EMPID INTEGER, 
    EMPNAME VARCHAR, 
    STREETADRESS VARCHAR, 
    REGION VARCHAR, 
    STATE VARCHAR, 
    COUNTRY VARCHAR, 
    joining_date date, 
    last_update_date TIMESTAMP(6) -- to hold till millisecond
      
  2. Моя целевая таблица также существует в BigQuery, и ниже приведена схема:

     create table if not exists `Project.dataset.table_name`
    (EMPID INT64,
    EMPNAME STRING,
    STREETADRESS STRING,
    REGION STRING,
    STATE STRING,
    COUNTRY STRING,
    joining_date DATE,
    last_update_date TIMESTAMP
    );
      
  3. Ниже приведен код, который я использую:

     import datetime
    from google.cloud import bigquery
    import pandas as pd
    import jaydebeapi
    import os
    
    client = bigquery.Client()
    
    table_id = "<project_id>.<dataset>.<target_table>"
    database_host='<IP Address>'
    database_user='<user id>'
    database_password='<password>'
    database_port='<port>'
    database_db='<database_name>'
    
    jclassname = "com.mysql.jdbc.Driver"
          url = "jdbc:mysql://{host}:{port}/{database}".format(host=database_host, port=database_port, database=database_db)
          driver_args = [database_user, database_password]
          jars = ["/<Home_Dir>/script/jars/mysql-connector-java-5.1.45.jar"]
          libs = None
    
    cnx = jaydebeapi.connect(jclassname, url, driver_args, jars=jars, libs=libs)
    query='select EMPID,EMPNAME,STREETADRESS,REGION,STATE,COUNTRY,joining_date,last_update_date from <table_name>'
    
    cursor = cnx.cursor()
    for chunk in pd.read_sql(query, cnx, coerce_float=True, params=None, parse_dates=None, columns=None,chunksize=500000):chunk.apply(lambda x: x.replace(u'r', u' ').replace(u'n', u' ') if isinstance(x, str) or isinstance(x, unicode) else x)
    
    job = client.load_table_from_dataframe(chunk, table_id)
    job.result()
      

Пожалуйста, помогите мне решить проблему. Я также пытался использовать LoadJobConfig, но появляется та же ошибка.

Комментарии:

1. Исправьте отступ и инструкции в for chunk in pd.read_sql(...) блоке.

2. Было бы полезно, если бы вы напечатали схему результирующих фрагментов pandas. Ошибка, похоже, указывает на какое-то несоответствие схемы. Также какую версию Arrow и BQ-клиента вы используете?

3. Привет, я запускаю это из командной строки python. Вывод запроса приведен ниже: >>> EMPID EMPNAME STREETADRESS REGION STATE COUNTRY joining_date last_update_date 0 168050 Arpan Roy A 3/3 Ананданагар; Дакшин-Бехала-Роуд, Бехала, Западная Бенгалия, Индия. 2005-11-17 2020-08-23 08:57:30 ========== PyArrow ========== numpy>= 1.14 enum34> = 1.1.6 six> = 1.0.0 Как я могу проверить версию клиента BQ?

4. Всем привет, я нашел проблему. Для столбцов ДАТЫ и ВРЕМЕННОЙ МЕТКИ нам нужно использовать parse_dates=[‘joining_date’, ‘last_update_date’] в pd.read_sql: targettabledttscols=[‘joining_date’, ‘last_update_date’] для фрагмента в pd.read_sql(запрос, cnx, coerce_float= True, параметры = None, parse_dates=targettabledttscols, столбцы =Нет,размер фрагмента = 500000):chunk.apply(лямбда x: x.replace(u ‘ r’, u»).replace(u ‘ n’, u»), если isinstance(x, str) или isinstance(x, unicode) иначе x) Теперь мне нужно выяснить, почему миллисекунда не записывается в «фрагмент» в pd.read_sql. Кто-нибудь сталкивался с подобной проблемой и получил какое-либо решение? Помогите, пожалуйста

Ответ №1:

Это тот же код, который вы скопировали? я имею в виду, что насчет инструкций indents?

 for chunk in pd.read_sql(query, cnx, coerce_float=True, params=None, parse_dates=None, columns=None,chunksize=500000):
    chunk.apply(lambda x: x.replace(u'r', u' ').replace(u'n', u' ') if isinstance(x, str) or isinstance(x, unicode) else x)
    job = client.load_table_from_dataframe(chunk, table_id)
    job.result()
  

Исправьте приведенную выше строку.

Комментарии:

1. Привет, я запускаю его из командной строки Python, и, следовательно, отступ такой. Я получаю правильный вывод из фрейма данных. Упомянутый вами отступ — это реальный код python. >>> EMPID EMPNAME STREETADRESS REGION STATE COUNTRY дата присоединения last_update_date 0 168050 Арпан Рой А 3/3 Ананданагар; Дакшин Бехала Роуд Бехала Западная Бенгалия Индия 2005-11-17 2020-08-23 08:57:30