#python #sql-server #stored-procedures #pyodbc
#python #sql-сервер #хранимые процедуры #pyodbc
Вопрос:
Я использую python для чтения CSV-файлов с помощью Pandas, исправления нескольких полей, а затем записи данных строка за строкой в таблицу на SQL Server. Массовый импорт на сервере отключен — также, потому что в конечном итоге таких файлов будет десятки, для автоматизации загрузки и приема файлов. Я вижу, что это занимает минуты, но для запуска требуются ЧАСЫ.
Я знаю, что мог бы массово загрузить этот материал за несколько секунд, если бы это было включено, но это может оказаться невозможным.
Проблема в том, что использование python может занимать от 1 до 3 часов за запуск. Это неприемлемо. Я хотел бы знать, есть ли более быстрый способ выполнить эту загрузку. Могу ли я что-нибудь сделать с таблицей, чтобы ускорить импорт, или другой способ кодирования.
Вот пример кода, который я использую:
def ingest_glief_reporting_exceptions_csv():
global conn
global cursor
filename = r"20200824-0800-gleif-goldencopy-repex-golden-copy.csv"
# filename = r"repex_1K.csv"
full_filename = os.path.join(raw_data_dir, filename)
sql_str = "exec dbo.util_get_gleif_last_reporting_exception"
cursor.execute(sql_str)
last_lei = ''
for result in cursor.fetchall():
last_lei = result[0]
# "repex" is short for "reporting exceptions", shorten the headers
repex_headers = [
'LEI',
'ExceptionCategory',
'ExceptionReason1',
'ExceptionReason2',
'ExceptionReason3',
'ExceptionReason4',
'ExceptionReason5',
'ExceptionReference1',
'ExceptionReference2',
'ExceptionReference3',
'ExceptionReference4',
'ExceptionReference5'
]
df = pd.read_csv(full_filename, header=0, quotechar='"')
# Change to the column headers generated in VBA
df.columns = repex_headers
for colname in df.columns:
df[colname] = df[colname].astype(str)
df[colname] = df[colname].replace({'nan': ''})
place_holder = '?,?'
for i in range(1, len(repex_headers)):
place_holder = ',?'
sql_str = "exec save_gleif_reporting_exception " place_holder
row_count = 0
row = dict()
do_not_upload = True
if last_lei == '':
do_not_upload = False # There was no last uploaded record, so we can start now
for index, row in df.iterrows():
row_count = 1
if do_not_upload:
if row['LEI'] == last_lei:
do_not_upload = False
continue
else:
continue
values = (
row['LEI'],
row['ExceptionCategory'],
row['ExceptionReason1'],
row['ExceptionReason2'],
row['ExceptionReason3'],
row['ExceptionReason4'],
row['ExceptionReason5'],
row['ExceptionReference1'],
row['ExceptionReference2'],
row['ExceptionReference3'],
row['ExceptionReference4'],
row['ExceptionReference5'],
filename
)
if index % 1000 == 0:
print("Imported %s rows" % (index))
# print(values)
# print("processing row ", row_count)
# return Key is the unique ID the database generated as it inserted this row of data.
error_sql_str = "exec log_message ?,?,?,?,?, ?,?,?,?"
connection_failures = 0
connection_failing = True
while connection_failures < 3 and connection_failing:
try:
return_key = cursor.execute(sql_str, values).fetchval()
except pyodbc.OperationalError as e:
connection_failures = 1
connection_failing = True
print("Connection issue. connection failures = ", connection_failures)
time.sleep(30) # wait 30 seconds and go to the top of the loop to try again.
continue
except pyodbc.ProgrammingError as e:
print("Bad field ", values)
error_values = (
'ERROR',
__file__,
filename,
'gleif_reporting_exceptions',
row['LEI'],
'',
'',
'',
str(e)
)
return_key = cursor.execute(error_sql_str, error_values).fetchval()
connection_failures = 0
connection_failures = 0
connection_failing = False
if connection_failures >= 3:
print("Unable to reconnect after 3 tries")
exit(1)
conn.close()
return
Я открываю базу данных следующим образом:
def init_connection(server_name, db_name):
"""
Connect to SQL Server database
:param server_name:
:param db_name:
:return:
"""
pyodbc.pooling = False
try:
conn = pyodbc.connect(
r'DRIVER={ODBC Driver 17 for SQL Server};SERVER=' server_name ';
Database=' db_name ';Trusted_Connection=yes;', timeout=5, autocommit=True)
except Exception as e:
print("Unable to connect to database [" db_name '] and server [' server_name ']')
print(e)
exit(1)
cursor = conn.cursor()
return [conn, cursor]
Хорошо.
Таблица определяется следующим образом:
CREATE TABLE [dbo].[gleif_exceptions](
[id] [bigint] IDENTITY(1,1) NOT NULL,
[ida_last_update_date] [datetime] NULL,
[ida_last_update_source_file] [nvarchar](500) NULL,
[LEI] [nvarchar](500) NULL,
[ExceptionCategory] [nvarchar](500) NULL,
[ExceptionReason1] [nvarchar](500) NULL,
[ExceptionReason2] [nvarchar](500) NULL,
[ExceptionReason3] [nvarchar](500) NULL,
[ExceptionReason4] [nvarchar](500) NULL,
[ExceptionReason5] [nvarchar](500) NULL,
[ExceptionReference1] [nvarchar](500) NULL,
[ExceptionReference2] [nvarchar](500) NULL,
[ExceptionReference3] [nvarchar](500) NULL,
[ExceptionReference4] [nvarchar](500) NULL,
[ExceptionReference5] [nvarchar](500) NULL
) ON [PRIMARY]
GO
Вот несколько примеров данных:
LEI,Exception.Category,Exception.Reason.1,Exception.Reason.2,Exception.Reason.3,Exception.Reason.4,Exception.Reason.5,Exception.Reference.1,Exception.Reference.2,Exception.Reference.3,Exception.Reference.4,Exception.Reference.5
004L5FPTUREIWK9T2N63,DIRECT_ACCOUNTING_CONSOLIDATION_PARENT,NON_CONSOLIDATING,,,,,,,,,
00EHHQ2ZHDCFXJCPCL46,DIRECT_ACCOUNTING_CONSOLIDATION_PARENT,NON_CONSOLIDATING,,,,,,,,,
И вот соответствующая хранимая процедура, которую я вызываю для сохранения записи в таблице:
ALTER PROCEDURE [dbo].[save_gleif_reporting_exception]
@LEI [nvarchar] (500) = NULL,
@ExceptionCategory [nvarchar] (500) = NULL,
@ExceptionReason1 [nvarchar] (500) = NULL,
@ExceptionReason2 [nvarchar] (500) = NULL,
@ExceptionReason3 [nvarchar] (500) = NULL,
@ExceptionReason4 [nvarchar] (500) = NULL,
@ExceptionReason5 [nvarchar] (500) = NULL,
@ExceptionReference1 [nvarchar] (500) = NULL,
@ExceptionReference2 [nvarchar] (500) = NULL,
@ExceptionReference3 [nvarchar] (500) = NULL,
@ExceptionReference4 [nvarchar] (500) = NULL,
@ExceptionReference5 [nvarchar] (500) = NULL,
@ida_last_update_source_file [nvarchar] (500) NULL
AS
BEGIN
-- SET NOCOUNT ON added to prevent extra result sets from
-- interfering with SELECT statements.
SET NOCOUNT ON;
-- Insert statements for procedure here
INSERT INTO dbo.gleif_reporting_exceptions(
[LEI],
[ExceptionCategory],
[ExceptionReason1],
[ExceptionReason2],
[ExceptionReason3],
[ExceptionReason4],
[ExceptionReason5],
[ExceptionReference1],
[ExceptionReference2],
[ExceptionReference3],
[ExceptionReference4],
[ExceptionReference5],
[ida_last_update_date],
[ida_last_update_source_file]
)
VALUES (
@LEI,
@ExceptionCategory,
@ExceptionReason1,
@ExceptionReason2,
@ExceptionReason3,
@ExceptionReason4,
@ExceptionReason5,
@ExceptionReference1,
@ExceptionReference2,
@ExceptionReference3,
@ExceptionReference4,
@ExceptionReference5,
GETDATE(),
@ida_last_update_source_file
)
SELECT @@IDENTITY
END
Примечание 1: хотя я объявляю строку как nvarchar (500), большинство из них не такие длинные. Я не думаю, что это имеет значение. Я пробовал использовать более короткие определения строк, и для запуска процедуры все еще требуется очень много времени.
Примечание 2: Это всего лишь один пример из 7 на данный момент. Самые маленькие таблицы содержат порядка нескольких 10-х K-строк, а их количество достигает нескольких миллионов. Количество столбцов варьируется от 7 до примерно 230.
Комментарии:
1. Похоже, что ваш код ничего не делает с
return_key
. Используется ли он где-либо еще, т. Е. Вам действительно нужно его извлечь?2. Я не использую его в данном случае. Я заимствовал код из другой работы, которую я выполнял ранее, где я использовал return_key. Я могу извлечь это и посмотреть, ускорит ли это процесс.
3. Хорошо, тогда вы можете вставить непосредственно в таблицу вместо использования хранимой процедуры?
Ответ №1:
Поскольку вам не нужно возвращаемое значение из хранимой процедуры, вы должны иметь возможность просто использовать to_sql
метод pandas для вставки строк непосредственно в таблицу. Этот код …
from time import time
import pandas as pd
import sqlalchemy as sa
from_engine = sa.create_engine("mssql pyodbc://@mssqlLocal64")
to_engine = sa.create_engine(
"mssql pyodbc://sa:_whatever_@192.168.0.199/mydb"
"?driver=ODBC Driver 17 for SQL Server",
fast_executemany=False,
)
# set up test
to_cnxn = to_engine.raw_connection()
to_cnxn.execute("TRUNCATE TABLE MillionRows")
to_cnxn.commit()
num_rows_to_upload = 10000
df = pd.read_sql_query(
f"SELECT TOP {num_rows_to_upload} "
"[TextField], [LongIntegerField], [DoubleField], [varchar_column] "
"FROM MillionRows ORDER BY ID",
from_engine,
)
# run test
t0 = time()
df.to_sql("MillionRows", to_engine, index=False, if_exists="append")
s = f"{(time() - t0):0.1f} seconds"
print(f"uploading {num_rows_to_upload:,d} rows took {s}")
… представляет примерно тот же внутренний уровень усилий, что и то, что вы делаете сейчас, т. Е. загружаете каждую отдельную строку как отдельный .execute
вызов. Результатом является
uploading 10,000 rows took 60.2 seconds
Однако простое изменение to_engine
для использования fast_executemany=True
приводит к
uploading 10,000 rows took 1.4 seconds
Комментарии:
1. Извините. Вчера и большую часть сегодняшнего дня была предпринята еще одна попытка. Надеюсь, я рассмотрю это до конца дня. Если это сработает для меня, я проверю это. Независимо от того, кто голосует за.
2. Мне все еще нужно решить некоторые проблемы с моим администратором базы данных, но я думаю, что это сработает. tx
Ответ №2:
Отключите автоматическую фиксацию
conn = pyodbc.connect(
r'DRIVER={ODBC Driver 17 for SQL Server};SERVER=' server_name ';
Database=' db_name ';Trusted_Connection=yes;', timeout=5, autocommit=False)
и зафиксируйте здесь и в конце цикла.
if index % 1000 == 0:
print("Imported %s rows" % (index))
При автоматической фиксации вам нужно дождаться сохранения файла журнала на диск после каждой строки.
Для дальнейшей оптимизации, если вы используете SQL 2016 , используйте JSON для отправки пакетов строк на SQL Server, анализируя на стороне сервера с помощью OPENJSON.
Комментарии:
1. Изменение фиксации, похоже, не приводит к заметному изменению скорости выполнения. Я взгляну на путь OPENJSON.
2. Пожалуйста, добавьте к вопросу вашу наблюдаемую скорость вставки в строках / сек и (если SQL 2016) повторный
select * from sys.dm_exec_session_wait_stats where session_id = @@spid
запуск после загрузки. Для SQL-сервера в локальной сети скорость вставки одной строки с пакетной обработкой транзакций должна составлять 100 с / с. 1000 в секунду при использовании JSON и 10000 в секунду при реальной массовой загрузке.