Python 3.8 — concurrent.futures.Производительность ProcessPoolExecutor снижается со временем

#python #pandas #multiprocessing #concurrent.futures #process-pool

#python #pandas #многопроцессорная обработка #concurrent.futures #пул процессов

Вопрос:

Я пытаюсь парализовать одну из моих функций сопоставления, и она работает в начале. Так приятно видеть, что мой 72-ядерный экземпляр ec2 убивает его, примерно через минуту или около того он возвращается к одноядерному, и итерация в секунду начинает падать.

 import concurrent.futures as cf

results = pd.DataFrame()

with cf.ProcessPoolExecutor() as executor:
    for res in tqdm(executor.map(matcher_helper, list(range(len(df))))):
        results = pd.concat([results, res], axis=0)
  

В самом начале я вижу это

многоядерная обработка

Затем это переходит к следующему

одноядерный сброс

Примерно на минуту обработка довольно приятная, чем одноядерная. При многопроцессорной обработке итерация составляет около 250 в секунду, и она снижается до 35 в секунду.

Любые рекомендации приветствуются.

РЕДАКТИРОВАТЬ — Дополнительная информация — Моя исходная функция:

 def matcher(data,
            data_radial_matrice,
            data_indice,
            comparison_data,
            comparison_radial_matrice,
            distance_threshold=.1):
    

    import pandas as pd
    from sklearn.metrics.pairwise import haversine_distances
    from fuzzywuzzy import fuzz
    import numpy as np

    lvl1 = haversine_distances(data_radial_matrice[data_indice].reshape(-1, 2),
                               comparison_radial_matrice) * 3959
    lvl2 = pd.Series(lvl1[lvl1 < distance_threshold])

    lvl1 = pd.DataFrame(np.argwhere(lvl1 < distance_threshold)).iloc[:, 1]

    lvl3 = pd.concat((lvl1, lvl2), axis=1)
    lvl3.columns = ['neigh_index', 'distance']
    lvl3.set_index('neigh_index', inplace=True)
    lvl3 = lvl3.merge(comparison_data,
                      left_index=True,
                      right_index=True,
                      how='inner')

    lvl4 = lvl3.loc[:, 'match_text'].apply(
        lambda x: fuzz.token_set_ratio(data.loc[data_indice, 'match_text'], x))
    lvl5 = np.where(lvl4 == np.max(lvl4))
    interim_result = lvl3.iloc[lvl5]
    interim_result['match_score'] = np.max(lvl4)
    interim_result['adp_indice'] = data_indice

    return interim_result
  

Комментарии:

1. Есть ли у вас минимальный воспроизводимый пример — небольшая программа, которую мы могли бы запустить, чтобы наблюдать такое же поведение?

2. На самом деле я думаю о способе поделиться частью, но данные, которые я использую, являются строго конфиденциальными, и подделать тестовые данные в таком размере будет невероятно сложно. Но я поделюсь своими внутренними функциями в своем редактировании.

3. @Tolga Немного не по теме: поскольку вам, по-видимому, требуется большая производительность, вы можете заменить fuzzywuzzy на github.com/maxbachmann/rapidfuzz .

4. Спасибо, Макс, любое улучшение производительности очень приветствуется в моей работе, я обязательно сделаю снимок rapidfuzz.

Ответ №1:

Основное узкое место в производительности вызвано pandas.concat процессом, когда я изменил часть сбора результатов np.concatenate на ту, которая решила проблему. В серверной части pandas после определенного порога ввода-вывода это замедляет весь процесс и убивает многоядерную обработку.

Я внес небольшие изменения в свой код, в конце я вернул массив numpy.

 def matcher2(data,
        data_radial_matrice,
        data_indice,
        comparison_data,
        comparison_radial_matrice,
        distance_threshold=.1):
'''  Haversine Distance between selected data point and comparison data points are calculated in miles
    by default is limited to .1 mile distance and among this filtered resuls matching is done and max score records are returned
'''

import pandas as pd
from sklearn.metrics.pairwise import haversine_distances
from fuzzywuzzy import fuzz
import numpy as np

lvl1 = haversine_distances(data_radial_matrice[data_indice].reshape(-1, 2),
                           comparison_radial_matrice) * 3959
lvl2 = pd.Series(lvl1[lvl1 < distance_threshold])

lvl1 = pd.DataFrame(np.argwhere(lvl1 < distance_threshold)).iloc[:, 1]

lvl3 = pd.concat((lvl1, lvl2), axis=1)
lvl3.columns = ['neigh_index', 'distance']
lvl3.set_index('neigh_index', inplace=True)
lvl3 = lvl3.merge(comparison_data,
                  left_index=True,
                  right_index=True,
                  how='inner')

lvl4 = lvl3.loc[:, 'match_text'].apply(
    lambda x: fuzz.token_set_ratio(data.loc[data_indice, 'match_text'], x))
lvl5 = np.where(lvl4 == np.max(lvl4))
interim_result = lvl3.iloc[lvl5]
interim_result['match_score'] = np.max(lvl4)
interim_result['adp_indice'] = data_indice

return np.array(interim_result)
  

В конце, пока я анализирую результаты.

 def dnb_matcher_helper(indice):
    return matcher2(adp, adp_rad, indice, dnb, dnb_rad)

import concurrent.futures as cf

dnb_results = np.empty(shape=(1,35))

with cf.ProcessPoolExecutor() as executor:
    for res in tqdm(executor.map(dnb_matcher_helper, 
list(range(len(adp))))):
    if len(res) == 0:
        continue
    else:
        for line in res:
            line = line.reshape((1,35))
            dnb_results = np.concatenate((dnb_results, line), axis=0)