Не удается добавить большие фреймы данных с помощью pandas во вложенных циклах. Как перейти на numpy векторизацию?

#python #pandas #append #vectorization #nested-loops

#python #pandas #добавить #векторизация #вложенные циклы

Вопрос:

Мне нужно загрузить огромную таблицу (6 ГБ) из старой базы данных postgres, которая содержит некоторые неверные значения, которые мне нужно удалить при загрузке. Итак, я написал цикл, который пытается загружать большие фрагменты по соображениям производительности, но шаг за шагом сокращает, чтобы изолировать и отбросить неверные значения. Обычно это работает, но после более или менее 500 тыс. записей производительность быстро снижается.

Я уже обнаружил, что не рекомендуется обрабатывать большие наборы данных с помощью pandas. Вот почему я попытался использовать numpy. Но это ничего не изменило. Затем я попытался использовать понимание списка, но потерпел неудачу из-за исключений, которые я должен использовать, чтобы попытаться выполнить итерацию меньшими порциями.

С моей точки зрения, numpy-векторизация выглядит хорошей идеей, но я понятия не имею, как заставить ее работать.

https://towardsdatascience.com/how-to-make-your-pandas-loop-71-803-times-faster-805030df4f06

В общем, эту часть я хотел бы ускорить massivley.

 df = pds.read_sql_query(sql,conn,params=[(i * chunksize), chunksize])
appended_df.append(df)
products_df = pds.concat(appended_df, ignore_index=True)

 

Если приведенному выше фрагменту недостаточно контекста, ниже вы найдете еще больше.

 # set autocommit = True
conn = pyodbc.connect(conn_str, autocommit=True)

cur = conn.cursor()

# count rows for chunking
sql_count = """
select count("item_no") from "products" 
"""
cur.execute(sql_count)
sql_row_counter = cur.fetchone()[0]
print("Total rows: "   str(sql_row_counter))

# define chunksize and calculate chunks
chunksize = 35000
chunk_divisor = 100
if chunksize / chunk_divisor < 1:
    chunk_divisor = chunksize
print("Chunk devisor on error: "   str(chunk_divisor))
chksz_lvl2 = int(chunksize / chunk_divisor)
if chksz_lvl2 < 1:
    chksz_lvl2 = 1
chksz_lvl3 = int(chksz_lvl2 / chunk_divisor)
if chksz_lvl3 < 1:
    chksz_lvl3 = 1
# print settings for iteration
print("Chunksize: "   str(chunksize)   "nChunksize Level 2: "  
       str(chksz_lvl2)   "nChunksize Level 3: "   str(chksz_lvl3))
chunks = int(sql_row_counter / chunksize)
# Uncomment next row for testpurposes
chunks = 25
print("Chunks: "   str(chunks)   "n")
error_counter = 0
# iterate chunks
appended_df = []
print("Starting to iterate chunks.nPlease wait...")

for i in range(0, chunks):
            # try to iterate in full speed
            print("nNext chunk starts from "   str((i * chunksize))  
                  " with an limit of "   str(chunksize)   ".")
            try:
                # start runtime measurment
                i_start = time.time()
                # sql statement
                sql = """
                select "item_no", "description_1", "description_2", "description_3" FROM "products" order by "item_no" offset ? limit ?"""
                # store into dataframe
                df = pds.read_sql_query(sql,
                                        conn,
                                        params=[(i * chunksize), chunksize])
                # get first and last value from dataframe
                head = df["item_no"].iloc[0]
                tail = df["item_no"].iloc[-1]
                # store query
                # Appending data frames via pandas.append() suddenly becomes slower by a factor of 10 from approx. 500,000 data records per 4 columns.
                appended_df.append(df)
                # stop runtime measurement
                i_end = time.time()
                # print result
                print(
                    str(i   1)   " out of "   str(chunks)   " chunks in "  
                    "{:5.3f}s".format(i_end - i_start)   " processed.")
            except:
                # collect error information
                print(
                    "nChunk "   str(i   1)  
                    " cannot be selected due to an error. Reduce chunk size from "
                      str(chunksize)   " to "   str(chksz_lvl2)  
                    ". Entering level 2.nFirst working item_no of last working chunk "
                      str(head)  
                    "nLast working item_no of last working chunk "  
                    str(tail))
                ### 2 ### Successively reduce the chunks to narrow down and isolate errors.
                for j in range(0, chunk_divisor):
                     
                  and so on...
                             ...
                                ...
                                   ...
# Merge chunks
print("nNote: Chunkzize = from row_no to row_no. Could be 1,2,3,4 = range of 4 or compleley different. Ex. 2,45,99,1002 = range of 4.nnConcatinate chunks.")
products_df = pds.DataFrame()
products_df = pds.concat(appended_df, ignore_index=True)
print("Done. "   str(error_counter)  
" rows had to be skipped. Details can be found in the full error log.")

conn.close()
 

Комментарии:

1. Вы пытались посмотреть на сторону dask? dask.org

2. С какими «плохими значениями» вы пытаетесь справиться? Все, что я вижу, — это голый результат, за исключением случаев возникновения какой-либо произвольной проблемы.

3. Каким-то образом коллегам удалось получить значения в базе данных, которые не могут быть закодированы utf8 и win1250, что является обязательным. Они мешают выбору. Уже отрегулировали в разных местах, например, у водителя и т.д.. Этот способ пока самый стабильный. Поэтому я хотел бы следовать этому. Из 500 000 записей лишь немногие являются проблематичными. Я еще не слышал о Даске. Я не являюсь нативным разработчиком и поэтому постепенно погружаюсь в тему. 🙂

4. @Tbaki Теперь я попытался переключиться на dask. В целом dask используется сейчас, но ничего не изменилось в отношении производительности. df = pds.read_sql_query(sql,conn,params=[(i * chunksize), chunksize]) dask_df = from_pandas(df, npartitions=4) appended_df.append(dask_df) products_df = multi.concat(appended_df, ignore_index=True) products_df=products_df.compute()

5. @Tbaki Я смог загрузить 1,4 млн записей в течение 20 минут. Есть еще кое-что, что нужно улучшить, но в целом это уже хорошо. Поэтому я могу окончательно подтвердить, ДА, ЭТО РАБОТАЕТ. 🙂

Ответ №1:

Я только что заметил, что скрипт python уже запущен, как ожидалось. У других фреймворков, таких как Dask, не было никаких шансов улучшить это. В моем случае исходная база данных Postgres (в моем случае v. 9.x), где я хотел бы получить некоторые данные, имеет проблему, связанную с использованием limit и order by в то же время при запросе огромных таблиц.

Я не смог обнаружить это напрямую, потому что мой инструмент SQL-запросов (DBeaver) загружает только подмножество для отображения, даже если вы хотите запросить полную таблицу. Поэтому результатом является ложный друг. Если вы хотите проверить правильность, запустите short select с довольно большим offset и limit упорядоченным порядком.

При смещении приблизительно 500 тыс. записей время выбора только одной записи в моем случае заняло около 10 секунд.

Решение состояло в том, чтобы удалить order by в моем встроенном SQL-скрипте часть «попробовать».