#python #pandas #multiprocessing #concurrent.futures #process-pool
#python #pandas #многопроцессорная обработка #concurrent.futures #пул процессов
Вопрос:
Я пытаюсь парализовать одну из моих функций сопоставления, и она работает в начале. Так приятно видеть, что мой 72-ядерный экземпляр ec2 убивает его, примерно через минуту или около того он возвращается к одноядерному, и итерация в секунду начинает падать.
import concurrent.futures as cf
results = pd.DataFrame()
with cf.ProcessPoolExecutor() as executor:
for res in tqdm(executor.map(matcher_helper, list(range(len(df))))):
results = pd.concat([results, res], axis=0)
В самом начале я вижу это
Затем это переходит к следующему
Примерно на минуту обработка довольно приятная, чем одноядерная. При многопроцессорной обработке итерация составляет около 250 в секунду, и она снижается до 35 в секунду.
Любые рекомендации приветствуются.
РЕДАКТИРОВАТЬ — Дополнительная информация — Моя исходная функция:
def matcher(data,
data_radial_matrice,
data_indice,
comparison_data,
comparison_radial_matrice,
distance_threshold=.1):
import pandas as pd
from sklearn.metrics.pairwise import haversine_distances
from fuzzywuzzy import fuzz
import numpy as np
lvl1 = haversine_distances(data_radial_matrice[data_indice].reshape(-1, 2),
comparison_radial_matrice) * 3959
lvl2 = pd.Series(lvl1[lvl1 < distance_threshold])
lvl1 = pd.DataFrame(np.argwhere(lvl1 < distance_threshold)).iloc[:, 1]
lvl3 = pd.concat((lvl1, lvl2), axis=1)
lvl3.columns = ['neigh_index', 'distance']
lvl3.set_index('neigh_index', inplace=True)
lvl3 = lvl3.merge(comparison_data,
left_index=True,
right_index=True,
how='inner')
lvl4 = lvl3.loc[:, 'match_text'].apply(
lambda x: fuzz.token_set_ratio(data.loc[data_indice, 'match_text'], x))
lvl5 = np.where(lvl4 == np.max(lvl4))
interim_result = lvl3.iloc[lvl5]
interim_result['match_score'] = np.max(lvl4)
interim_result['adp_indice'] = data_indice
return interim_result
Комментарии:
1. Есть ли у вас минимальный воспроизводимый пример — небольшая программа, которую мы могли бы запустить, чтобы наблюдать такое же поведение?
2. На самом деле я думаю о способе поделиться частью, но данные, которые я использую, являются строго конфиденциальными, и подделать тестовые данные в таком размере будет невероятно сложно. Но я поделюсь своими внутренними функциями в своем редактировании.
3. @Tolga Немного не по теме: поскольку вам, по-видимому, требуется большая производительность, вы можете заменить fuzzywuzzy на github.com/maxbachmann/rapidfuzz .
4. Спасибо, Макс, любое улучшение производительности очень приветствуется в моей работе, я обязательно сделаю снимок rapidfuzz.
Ответ №1:
Основное узкое место в производительности вызвано pandas.concat
процессом, когда я изменил часть сбора результатов np.concatenate
на ту, которая решила проблему. В серверной части pandas после определенного порога ввода-вывода это замедляет весь процесс и убивает многоядерную обработку.
Я внес небольшие изменения в свой код, в конце я вернул массив numpy.
def matcher2(data,
data_radial_matrice,
data_indice,
comparison_data,
comparison_radial_matrice,
distance_threshold=.1):
''' Haversine Distance between selected data point and comparison data points are calculated in miles
by default is limited to .1 mile distance and among this filtered resuls matching is done and max score records are returned
'''
import pandas as pd
from sklearn.metrics.pairwise import haversine_distances
from fuzzywuzzy import fuzz
import numpy as np
lvl1 = haversine_distances(data_radial_matrice[data_indice].reshape(-1, 2),
comparison_radial_matrice) * 3959
lvl2 = pd.Series(lvl1[lvl1 < distance_threshold])
lvl1 = pd.DataFrame(np.argwhere(lvl1 < distance_threshold)).iloc[:, 1]
lvl3 = pd.concat((lvl1, lvl2), axis=1)
lvl3.columns = ['neigh_index', 'distance']
lvl3.set_index('neigh_index', inplace=True)
lvl3 = lvl3.merge(comparison_data,
left_index=True,
right_index=True,
how='inner')
lvl4 = lvl3.loc[:, 'match_text'].apply(
lambda x: fuzz.token_set_ratio(data.loc[data_indice, 'match_text'], x))
lvl5 = np.where(lvl4 == np.max(lvl4))
interim_result = lvl3.iloc[lvl5]
interim_result['match_score'] = np.max(lvl4)
interim_result['adp_indice'] = data_indice
return np.array(interim_result)
В конце, пока я анализирую результаты.
def dnb_matcher_helper(indice):
return matcher2(adp, adp_rad, indice, dnb, dnb_rad)
import concurrent.futures as cf
dnb_results = np.empty(shape=(1,35))
with cf.ProcessPoolExecutor() as executor:
for res in tqdm(executor.map(dnb_matcher_helper,
list(range(len(adp))))):
if len(res) == 0:
continue
else:
for line in res:
line = line.reshape((1,35))
dnb_results = np.concatenate((dnb_results, line), axis=0)