Многопроцессорность в фрейме данных pandas

#pandas #multithreading #dataframe #parallel-processing #multiprocessing

#pandas #многопоточность #фрейм данных #параллельная обработка #многопроцессорность

Вопрос:

Я применяю функцию к столбцу фрейма данных, но я хочу сделать это быстрее, поскольку функция требует много времени обработки при последовательном выполнении.

 df[df['codes']=='None']['q'][:1].apply(lambda x: clf(x,candidate_labels))
  

Последовательно выполняется только одна строка 2.52 secs для запуска, но при выполнении приведенного ниже кода с использованием многопроцессорной обработки это занимает намного больше времени, 51.61 secs и у меня есть около 2500 rows для обработки, поэтому для запуска функции потребуется много времени. Я хочу, по крайней мере, ускорить его 20% .

 import multiprocessing
import pandas as pd
import numpy as np

def clf(x):
    ...
    return list
def _apply_df(args):
    df, func, kwargs = args
    return df.apply(func, **kwargs)

def apply_by_multiprocessing(df, func, **kwargs):
    workers = kwargs.pop('workers')
    pool = multiprocessing.Pool(processes=workers)
    result = pool.map(_apply_df, [(d, func, kwargs)
            for d in np.array_split(df, workers)])
    pool.close()
    return pd.concat(list(result))
    
if __name__ == '__main__':
    tart_time = time.time()
    res=apply_by_multiprocessing(df[df['codes']=='None']['q'][:1],clf, workers=4)  
    print(res)
    print("--- %s seconds ---" % (time.time() - start_time))
    ## run by 4 processors
  

Я также пробовал разные итерации для многопроцессорной обработки, но, похоже, ни одна из них не ускоряет поток, поскольку они замедляют мой код.

 from pandarallel import pandarallel
import time
pandarallel.initialize(progress_bar=True)

start_time = time.time()
categories = df[df['codes']=='None']['q'][:10].parallel_apply(lambda x: clf(x,candidate_labels))
print("--- %s seconds ---" % (time.time() - start_time))
  

Еще один эксперимент:

 import multiprocessing as mp

def clf:
    ...
    return list

if __name__ == '__main__':
    p = mp.Pool(processes=8)
    pool_results = p.map(clf, df[df['codes']=='None']['q'][:1])
    p.close()
    p.join()
  

Комментарии:

1. Что clf делает? Возможно, существуют векторизованные альтернативы… кроме того, для настройки многопроцессорности требуется время, и это компенсируется масштабированием, но если нет четкого способа разделить функцию на потоки, вы получаете нулевое время и тратите больше времени на настройку вызова

2. @RichieV Спасибо за ваш ответ, so clf на самом деле использует предварительно подготовленную модель для классификации текста, которую я применяю к каждой строке фрейма данных. Каждая строка представляет собой предложение, которое необходимо классифицировать по определенным темам.

3. Какую модель вы используете? Я знаю, что в sklearn в качестве опции встроена многопроцессорность

4. В противном случае, есть ли способ передать несколько строк в вашу модель? Я бы изучил это перед созданием многопроцессорного

5. @RichieV Я ничего не использую из sklearn. это модель классификации текста с нулевым результатом от HuggingFace. Функция clf просто выполняет candidate_labels=['a','b',..] cat=classifier(seq,candidate_labels) return cat

Ответ №1:

Может быть, вы можете использовать это:https://github.com/xieqihui/pandas-multiprocess

 pip install pandas-multiprocess
  
 from pandas_multiprocess import multi_process


args = {'workers': 4}
result = multi_process(func=clf, data=df, num_process=8, **args)
  

Комментарии:

1. Я только что попробовал это, и для одной строки требуется около 17 secs . Все еще медленнее, чем .apply функция, и не соответствует ожидаемым результатам многопроцессорной обработки.