самый быстрый способ вставить 800 миллионов данных в базу данных postgres

#python #postgresql #multiprocessing #bulkinsert

Вопрос:

У меня есть скрипт, который читает некоторые файлы один за другим, очищает их и вставляет в базу данных postgres.

я пытался использовать многопроцессорную обработку python с использованием пулов, но на самом деле я обнаружил, что загрузка процессора все еще иногда достигает 30% и большую часть времени возвращается к 6%. так что это действительно так медленно.

Есть какие-нибудь предложения по его ускорению ?

Спасибо

 import os
import multiprocessing


path = 'data/'
arr = os.listdir(path)

connection = psycopg2.connect(
    user="postgres", password="blabla", host="127.0.0.1", port="5432", database="test"
)
cursor = connection.cursor()

postgres_insert_query = """ INSERT INTO mobile (data1, data2) VALUES (%s,%s) 
                        ON CONFLICT (data1) 
                        DO 
                        UPDATE SET data2 = EXCLUDED.data1 ;"""


def insert_data(key,record_to_insert, item):
    print(key)
    try:
        cursor.executemany(postgres_insert_query, record_to_insert)

        connection.commit()
        count = cursor.rowcount
        print(count, "Record inserted successfully into mobile table", item)

    except (Exception, psycopg2.Error) as error:
        print("Failed to insert record into mobile table", error)


i = 1

def process_data(item):
    print(item)

    global i
    records = []

    i =1
    with open(path item,'r') as file:
        for line in file:
            line = dataCleansing(line)
            records.append((line '-' str(i),'data2-' str(i) line))
  
            if len(records)==50000:
                insert_data(i,records,item)
                records=[]

        insert_data(i,records,item)
        records=[]
            
if __name__ == '__main__':
    a_pool = multiprocessing.Pool(6)

    result = a_pool.map(process_data, arr)

 

Комментарии:

1. Чтение данных с диска, вероятно, является здесь узким местом. Добавление параллелизма к задаче, связанной с вводом-выводом, может только увеличить перегрузку. Если вы сможете переместить каждый файл на отдельный вращающийся диск, это может повысить параллельную производительность. Если диски уже являются отдельными и/или SSD-дисками, вы, вероятно, уже увеличиваете пропускную способность ввода-вывода.

2. В соответствии с курсором docs psycopg2 : executemany «Предупреждение В текущей реализации этот метод не быстрее, чем выполнение функции execute() в цикле. Для повышения производительности вы можете использовать функции, описанные в пособиях по быстрому выполнению.» Хотя, если вы действительно хотите ускорить процесс, вы бы использовали КОПИРОВАНИЕ .

3. Вы не можете совместно использовать соединение psycopg2 между несколькими процессами, вы получите непредсказуемые ошибки или повреждение данных.