Выполнение кода Python для вызова хранимой процедуры SQL Server для приема CSV-файлов занимает ЧАСЫ

#python #sql-server #stored-procedures #pyodbc

#python #sql-сервер #хранимые процедуры #pyodbc

Вопрос:

Я использую python для чтения CSV-файлов с помощью Pandas, исправления нескольких полей, а затем записи данных строка за строкой в таблицу на SQL Server. Массовый импорт на сервере отключен — также, потому что в конечном итоге таких файлов будет десятки, для автоматизации загрузки и приема файлов. Я вижу, что это занимает минуты, но для запуска требуются ЧАСЫ.

Я знаю, что мог бы массово загрузить этот материал за несколько секунд, если бы это было включено, но это может оказаться невозможным.

Проблема в том, что использование python может занимать от 1 до 3 часов за запуск. Это неприемлемо. Я хотел бы знать, есть ли более быстрый способ выполнить эту загрузку. Могу ли я что-нибудь сделать с таблицей, чтобы ускорить импорт, или другой способ кодирования.

Вот пример кода, который я использую:

 def ingest_glief_reporting_exceptions_csv():
    global conn
    global cursor
    filename = r"20200824-0800-gleif-goldencopy-repex-golden-copy.csv"
    # filename = r"repex_1K.csv"

    full_filename = os.path.join(raw_data_dir, filename)

    sql_str = "exec dbo.util_get_gleif_last_reporting_exception"
    cursor.execute(sql_str)
    last_lei = ''
    for result in cursor.fetchall():
        last_lei = result[0]

    # "repex" is short for "reporting exceptions", shorten the headers
    repex_headers = [
        'LEI',
        'ExceptionCategory',
        'ExceptionReason1',
        'ExceptionReason2',
        'ExceptionReason3',
        'ExceptionReason4',
        'ExceptionReason5',
        'ExceptionReference1',
        'ExceptionReference2',
        'ExceptionReference3',
        'ExceptionReference4',
        'ExceptionReference5'
    ]

    df = pd.read_csv(full_filename, header=0, quotechar='"')

    # Change to the column headers generated in VBA
    df.columns = repex_headers

    for colname in df.columns:
        df[colname] = df[colname].astype(str)
        df[colname] = df[colname].replace({'nan': ''})


    place_holder = '?,?'
    for i in range(1, len(repex_headers)):
        place_holder  = ',?'

    sql_str = "exec save_gleif_reporting_exception "   place_holder

    row_count = 0
    row = dict()
    do_not_upload = True
    if last_lei == '':
        do_not_upload = False   # There was no last uploaded record, so we can start now

    for index, row in df.iterrows():
        row_count  = 1
        if do_not_upload:
            if row['LEI'] == last_lei:
                do_not_upload = False
                continue
            else:
                continue

        values = (
            row['LEI'],
            row['ExceptionCategory'],
            row['ExceptionReason1'],
            row['ExceptionReason2'],
            row['ExceptionReason3'],
            row['ExceptionReason4'],
            row['ExceptionReason5'],
            row['ExceptionReference1'],
            row['ExceptionReference2'],
            row['ExceptionReference3'],
            row['ExceptionReference4'],
            row['ExceptionReference5'],
            filename
        )

        if index % 1000 == 0:
                print("Imported %s rows" % (index))

        # print(values)
        # print("processing row ", row_count)
        # return Key is the unique ID the database generated as it inserted this row of data.
        error_sql_str = "exec log_message ?,?,?,?,?, ?,?,?,?"
        connection_failures = 0
        connection_failing = True
        while connection_failures < 3 and connection_failing:
            try:
                return_key = cursor.execute(sql_str, values).fetchval()
            except pyodbc.OperationalError as e:
                connection_failures  = 1
                connection_failing = True
                print("Connection issue.  connection failures = ", connection_failures)
                time.sleep(30)      # wait 30 seconds and go to the top of the loop to try again.
                continue
            except pyodbc.ProgrammingError as e:
                print("Bad field ", values)
                error_values = (
                    'ERROR',
                    __file__,
                    filename,
                    'gleif_reporting_exceptions',
                    row['LEI'],
                    '',
                    '',
                    '',
                    str(e)
                )
                return_key = cursor.execute(error_sql_str, error_values).fetchval()
                connection_failures = 0
            connection_failures = 0
            connection_failing = False

        if connection_failures >= 3:
            print("Unable to reconnect after 3 tries")
            exit(1)

    conn.close()
    return
  

Я открываю базу данных следующим образом:

 def init_connection(server_name, db_name):
    """
    Connect to SQL Server database
    :param server_name:
    :param db_name:
    :return:
    """
    pyodbc.pooling = False
    try:
        conn = pyodbc.connect(
            r'DRIVER={ODBC Driver 17 for SQL Server};SERVER='   server_name   '; 
            Database='   db_name   ';Trusted_Connection=yes;', timeout=5, autocommit=True)
    except Exception as e:
        print("Unable to connect to database ["   db_name   '] and server ['   server_name   ']')
        print(e)
        exit(1)

    cursor = conn.cursor()
    return [conn, cursor]
  

Хорошо.

Таблица определяется следующим образом:

 CREATE TABLE [dbo].[gleif_exceptions](
    [id] [bigint] IDENTITY(1,1) NOT NULL,
    [ida_last_update_date] [datetime] NULL,
    [ida_last_update_source_file] [nvarchar](500) NULL,
    [LEI] [nvarchar](500) NULL,
    [ExceptionCategory] [nvarchar](500) NULL,
    [ExceptionReason1] [nvarchar](500) NULL,
    [ExceptionReason2] [nvarchar](500) NULL,
    [ExceptionReason3] [nvarchar](500) NULL,
    [ExceptionReason4] [nvarchar](500) NULL,
    [ExceptionReason5] [nvarchar](500) NULL,
    [ExceptionReference1] [nvarchar](500) NULL,
    [ExceptionReference2] [nvarchar](500) NULL,
    [ExceptionReference3] [nvarchar](500) NULL,
    [ExceptionReference4] [nvarchar](500) NULL,
    [ExceptionReference5] [nvarchar](500) NULL
) ON [PRIMARY]
GO
  

Вот несколько примеров данных:

 LEI,Exception.Category,Exception.Reason.1,Exception.Reason.2,Exception.Reason.3,Exception.Reason.4,Exception.Reason.5,Exception.Reference.1,Exception.Reference.2,Exception.Reference.3,Exception.Reference.4,Exception.Reference.5
004L5FPTUREIWK9T2N63,DIRECT_ACCOUNTING_CONSOLIDATION_PARENT,NON_CONSOLIDATING,,,,,,,,,
00EHHQ2ZHDCFXJCPCL46,DIRECT_ACCOUNTING_CONSOLIDATION_PARENT,NON_CONSOLIDATING,,,,,,,,,
  

И вот соответствующая хранимая процедура, которую я вызываю для сохранения записи в таблице:

 ALTER PROCEDURE [dbo].[save_gleif_reporting_exception]
    @LEI [nvarchar] (500) = NULL,
    @ExceptionCategory [nvarchar] (500) = NULL,
    @ExceptionReason1 [nvarchar] (500) = NULL,
    @ExceptionReason2 [nvarchar] (500) = NULL,
    @ExceptionReason3 [nvarchar] (500) = NULL,
    @ExceptionReason4 [nvarchar] (500) = NULL,
    @ExceptionReason5 [nvarchar] (500) = NULL,
    @ExceptionReference1 [nvarchar] (500) = NULL,
    @ExceptionReference2 [nvarchar] (500) = NULL,
    @ExceptionReference3 [nvarchar] (500) = NULL,
    @ExceptionReference4 [nvarchar] (500) = NULL,
    @ExceptionReference5 [nvarchar] (500) = NULL,
    @ida_last_update_source_file [nvarchar] (500) NULL
AS
BEGIN
    -- SET NOCOUNT ON added to prevent extra result sets from
    -- interfering with SELECT statements.
    SET NOCOUNT ON;

    -- Insert statements for procedure here
    INSERT INTO dbo.gleif_reporting_exceptions(
        [LEI],
        [ExceptionCategory],
        [ExceptionReason1],
        [ExceptionReason2],
        [ExceptionReason3],
        [ExceptionReason4],
        [ExceptionReason5],
        [ExceptionReference1],
        [ExceptionReference2],
        [ExceptionReference3],
        [ExceptionReference4],
        [ExceptionReference5],
        [ida_last_update_date],
        [ida_last_update_source_file]
    )
    VALUES (
        @LEI,
        @ExceptionCategory,
        @ExceptionReason1,
        @ExceptionReason2,
        @ExceptionReason3,
        @ExceptionReason4,
        @ExceptionReason5,
        @ExceptionReference1,
        @ExceptionReference2,
        @ExceptionReference3,
        @ExceptionReference4,
        @ExceptionReference5,
        GETDATE(),
        @ida_last_update_source_file
    
    )

    SELECT @@IDENTITY

END
  

Примечание 1: хотя я объявляю строку как nvarchar (500), большинство из них не такие длинные. Я не думаю, что это имеет значение. Я пробовал использовать более короткие определения строк, и для запуска процедуры все еще требуется очень много времени.

Примечание 2: Это всего лишь один пример из 7 на данный момент. Самые маленькие таблицы содержат порядка нескольких 10-х K-строк, а их количество достигает нескольких миллионов. Количество столбцов варьируется от 7 до примерно 230.

Комментарии:

1. Похоже, что ваш код ничего не делает с return_key . Используется ли он где-либо еще, т. Е. Вам действительно нужно его извлечь?

2. Я не использую его в данном случае. Я заимствовал код из другой работы, которую я выполнял ранее, где я использовал return_key. Я могу извлечь это и посмотреть, ускорит ли это процесс.

3. Хорошо, тогда вы можете вставить непосредственно в таблицу вместо использования хранимой процедуры?

Ответ №1:

Поскольку вам не нужно возвращаемое значение из хранимой процедуры, вы должны иметь возможность просто использовать to_sql метод pandas для вставки строк непосредственно в таблицу. Этот код …

 from time import time
import pandas as pd
import sqlalchemy as sa

from_engine = sa.create_engine("mssql pyodbc://@mssqlLocal64")
to_engine = sa.create_engine(
    "mssql pyodbc://sa:_whatever_@192.168.0.199/mydb"
    "?driver=ODBC Driver 17 for SQL Server",
    fast_executemany=False,
)

# set up test
to_cnxn = to_engine.raw_connection()
to_cnxn.execute("TRUNCATE TABLE MillionRows")
to_cnxn.commit()
num_rows_to_upload = 10000
df = pd.read_sql_query(
    f"SELECT TOP {num_rows_to_upload} "
    "[TextField], [LongIntegerField], [DoubleField], [varchar_column] "
    "FROM MillionRows ORDER BY ID",
    from_engine,
)

# run test
t0 = time()
df.to_sql("MillionRows", to_engine, index=False, if_exists="append")
s = f"{(time() - t0):0.1f} seconds"
print(f"uploading {num_rows_to_upload:,d} rows took {s}")
  

… представляет примерно тот же внутренний уровень усилий, что и то, что вы делаете сейчас, т. Е. загружаете каждую отдельную строку как отдельный .execute вызов. Результатом является

 uploading 10,000 rows took 60.2 seconds
  

Однако простое изменение to_engine для использования fast_executemany=True приводит к

 uploading 10,000 rows took 1.4 seconds
  

Комментарии:

1. Извините. Вчера и большую часть сегодняшнего дня была предпринята еще одна попытка. Надеюсь, я рассмотрю это до конца дня. Если это сработает для меня, я проверю это. Независимо от того, кто голосует за.

2. Мне все еще нужно решить некоторые проблемы с моим администратором базы данных, но я думаю, что это сработает. tx

Ответ №2:

Отключите автоматическую фиксацию

  conn = pyodbc.connect(
        r'DRIVER={ODBC Driver 17 for SQL Server};SERVER='   server_name   '; 
        Database='   db_name   ';Trusted_Connection=yes;', timeout=5, autocommit=False)
 
  

и зафиксируйте здесь и в конце цикла.

     if index % 1000 == 0:
            print("Imported %s rows" % (index))
  

При автоматической фиксации вам нужно дождаться сохранения файла журнала на диск после каждой строки.

Для дальнейшей оптимизации, если вы используете SQL 2016 , используйте JSON для отправки пакетов строк на SQL Server, анализируя на стороне сервера с помощью OPENJSON.

Комментарии:

1. Изменение фиксации, похоже, не приводит к заметному изменению скорости выполнения. Я взгляну на путь OPENJSON.

2. Пожалуйста, добавьте к вопросу вашу наблюдаемую скорость вставки в строках / сек и (если SQL 2016) повторный select * from sys.dm_exec_session_wait_stats where session_id = @@spid запуск после загрузки. Для SQL-сервера в локальной сети скорость вставки одной строки с пакетной обработкой транзакций должна составлять 100 с / с. 1000 в секунду при использовании JSON и 10000 в секунду при реальной массовой загрузке.