#python #postgresql #multiprocessing #bulkinsert
Вопрос:
У меня есть скрипт, который читает некоторые файлы один за другим, очищает их и вставляет в базу данных postgres.
я пытался использовать многопроцессорную обработку python с использованием пулов, но на самом деле я обнаружил, что загрузка процессора все еще иногда достигает 30% и большую часть времени возвращается к 6%. так что это действительно так медленно.
Есть какие-нибудь предложения по его ускорению ?
Спасибо
import os
import multiprocessing
path = 'data/'
arr = os.listdir(path)
connection = psycopg2.connect(
user="postgres", password="blabla", host="127.0.0.1", port="5432", database="test"
)
cursor = connection.cursor()
postgres_insert_query = """ INSERT INTO mobile (data1, data2) VALUES (%s,%s)
ON CONFLICT (data1)
DO
UPDATE SET data2 = EXCLUDED.data1 ;"""
def insert_data(key,record_to_insert, item):
print(key)
try:
cursor.executemany(postgres_insert_query, record_to_insert)
connection.commit()
count = cursor.rowcount
print(count, "Record inserted successfully into mobile table", item)
except (Exception, psycopg2.Error) as error:
print("Failed to insert record into mobile table", error)
i = 1
def process_data(item):
print(item)
global i
records = []
i =1
with open(path item,'r') as file:
for line in file:
line = dataCleansing(line)
records.append((line '-' str(i),'data2-' str(i) line))
if len(records)==50000:
insert_data(i,records,item)
records=[]
insert_data(i,records,item)
records=[]
if __name__ == '__main__':
a_pool = multiprocessing.Pool(6)
result = a_pool.map(process_data, arr)
Комментарии:
1. Чтение данных с диска, вероятно, является здесь узким местом. Добавление параллелизма к задаче, связанной с вводом-выводом, может только увеличить перегрузку. Если вы сможете переместить каждый файл на отдельный вращающийся диск, это может повысить параллельную производительность. Если диски уже являются отдельными и/или SSD-дисками, вы, вероятно, уже увеличиваете пропускную способность ввода-вывода.
2. В соответствии с курсором docs psycopg2 :
executemany
«Предупреждение В текущей реализации этот метод не быстрее, чем выполнение функции execute() в цикле. Для повышения производительности вы можете использовать функции, описанные в пособиях по быстрому выполнению.» Хотя, если вы действительно хотите ускорить процесс, вы бы использовали КОПИРОВАНИЕ .3. Вы не можете совместно использовать соединение psycopg2 между несколькими процессами, вы получите непредсказуемые ошибки или повреждение данных.