psycopg2.errors.UndefinedTable: таблица «live» не существует, передавая большой фрейм данных в базу данных postgres

#python #pandas #postgresql #multithreading #multiprocessing

#python #панды #postgresql #многопоточность #многопроцессорная обработка

Вопрос:

Я пытаюсь отправить большой фрейм данных в базу данных Postgres, для этого использую df_to_sql, проблема в том, что для выполнения этой задачи требуется вечность, у меня до 1 миллиона строк фрейма данных, поэтому я решил использовать многопроцессорную обработку python.

ниже приведены мои коды

 def parallelize_push_to_db(df, table, chunk_size):
    chunk = split_df(df, chunk_size=chunk_size)
    chunk_len = len(chunk)
    processes = list()
    start_time = time.time()
    for i in range(chunk_len):
        process = Process(target=pussh_df_to_db, args=(chunk[i], table))
        process.start()
        processes.append(process)
        for pro in processes:
            pro.join()
    end_time = time.time() - start_time
    print(f'parra {end_time}')

def split_df(df, chunk_size):
    chunks = list()
    split_size = math.ceil(len(df) / chunk_size)
    for i in range(split_size):
        chunks.append(df[i * chunk_size:(i   1) * chunk_size])
    return chunks

def pussh_df_to_db(df, table):
    # i think we should append the new df to the data in the db if not empty
    base = DB_Base()
    base.pg_cur.execute(f"DROP TABLE IF EXISTS {table}")
    print(df.shape)
    print('running...........to send dt to postgres')
    df.to_sql(table, con=base.sql_alchemy_engine_conn, if_exists='replace')

if "__name__" == "__main__":
    parallelize_push_to_db(check_data, 'live', 1000)
 

некоторая часть кода выполняется, но завершается сбоем в более поздней части с приведенной ниже ошибкой

 in do_execute
    cursor.execute(statement, parameters)
sqlalchemy.exc.ProgrammingError: (psycopg2.errors.UndefinedTable) table "user_user_recommendations_AAa" does not exist

[SQL: 
DROP TABLE "live"]
 

я думаю, что каждый процесс пытается создать новую таблицу и удалить старую, я действительно не знаю, как это решить, любая помощь??

Комментарии:

1. Да, каждый процесс будет пытаться удалить таблицу. Почему вы удаляете таблицу вместо того, чтобы просто добавлять строки (if_exists=»добавить» вместо if_exists=’заменить’)? Я не уверен, почему возникла ошибка, поскольку to_sql снова создаст таблицу, поэтому вы не должны получать таблицу, которой не существует. .

2. Что произойдет, если вы создадите эту таблицу в postgres перед запуском этого кода, удалите оператор drop table и используйте ‘append’ вместо ‘replace’?

3. хорошо, попробуем это

Ответ №1:

Прежде чем я начну, я хотел бы сказать, что вам будет лучше использовать модули потоков, прежде чем принимать решения об использовании такого рода методов, вы должны понимать задачу, которую вы выполняете. некоторые задачи являются задачами процессора, в то время как другие являются тяжелыми задачами ввода-вывода. поэтому постарайтесь понять все это, прежде чем выбирать, какой из них вы хотите, но иногда они оба будут работать.

 def parallelize_push_to_db(df, table, chunk_size):
    chunk = split_df(df, chunk_size=chunk_size)
    chunk_len = len(chunk)
    processes = list()
    start_time = time.time()
    for i in range(chunk_len):
        process = Process(target=pussh_df_to_db, args=(chunk[i], table))
        process.start()
        processes.append(process)
    for pro in processes:
        pro.join()
    end_time = time.time() - start_time
    print(f'parra {end_time}')
 

первый ответ также будет работать, но решение вашего вопроса таково..

Ответ №2:

Некоторые альтернативные способы:

  1. сохраните df в файл csv и скопируйте в базу данных postgresql.

см.: https://www.postgresql.org/docs/13/sql-copy.html

 COPY table_name [ ( column_name [, ...] ) ]
    FROM { 'filename' | PROGRAM 'command' | STDIN }
    [ [ WITH ] ( option [, ...] ) ]

where option can be one of:

    FORMAT format_name
    OIDS [ boolean ]
    FREEZE [ boolean ]
    DELIMITER 'delimiter_character'
    NULL 'null_string'
    HEADER [ boolean ]
    QUOTE 'quote_character'
    ESCAPE 'escape_character'
    FORCE_QUOTE { ( column_name [, ...] ) | * }
    FORCE_NOT_NULL ( column_name [, ...] )
    ENCODING 'encoding_name'
 
  1. используйте быстрый способ сброса df в db.

см.: https://code.i-harness.com/en/q/16089da

 import io
import csv
from sqlalchemy import create_engine
def df2db(df_a, table_name, engine):
    output = io.StringIO()
    # ignore the index
    # df_a.to_csv(output, sep='t', index = False, header = False, quoting=csv.QUOTE_NONE)
    df_a.to_csv(output, sep='t', index = False, header = False, quoting=csv.QUOTE_NONE, escapechar='\')
    output.getvalue()
    # jump to start of stream
    output.seek(0)
    
    # engine -> from sqlalchemy import create_engine
    connection = engine.raw_connection() 
    cursor = connection.cursor()
    # null value become ''
    cursor.copy_from(output,table_name,null='')
    connection.commit()
    cursor.close()

engine = create_engine('postgresql psycopg2://username:passport@localhost:5432/databasname')
df2db(df, table_name, engine)