Лучший способ оптимизировать сложный цикл, который повторяет фрейм данных

#python #pandas #dataframe #pyspark #luigi

Вопрос:

Здесь у меня есть пара методов, которые занимают больше времени, чем хотелось бы. В настоящее время я упираюсь в стену, поскольку не вижу никакого очевидного способа написать эти методы более эффективным способом.

Для справки, то, что делает код, — это обработка набора данных о продажах, чтобы найти предыдущие заказы на продажу, связанные с тем же клиентом. Однако, как вы увидите, в середине много бизнес-логики, которая, вероятно, замедляет работу.

Я думал о рефакторинге этого в PySpark job, но прежде чем я это сделаю, я хотел бы знать, лучший ли это способ сделать это.

Я буду очень признателен за любые предложения здесь.

Больше контекста: выполнение каждого цикла занимает около 10 минут. В нем около 24 тыс. строк search_keys . Эти методы являются частью задачи Луиджи.

 def previous_commits(self, df: pd.DataFrame):
# Build some filters to slice data:
search_keys = df.loc[:, ['accountid', 'opportunityid']].drop_duplicates()
cols_a = ['opportunityid', 'opportunity_type', 'platform', 'closedate']
cols_b = ['opportunityid', 'productid']

# Build a list with the previous commit oppy_id:
commits = [
    {
        'acc_id': acc,
        'current_oppy': oppy,
        'previous_commit': self.fetch_latest_commit(oppy, df.loc[df.accountid == acc, cols_a].drop_duplicates())
    }
    for oppy, acc in tqdm(
        zip(search_keys.opportunityid, search_keys.accountid),
        desc='Finding previous commits data',
        file=sys.stdout,
        total=search_keys.shape[0]
    )
]

# Fetch products for the previous commit as well as the current row oppy:
products = [
    {
        'current_oppy': x.get('current_oppy'),
        'current_products': self.fetch_products_id(
            [x.get('current_oppy')],
            df.loc[df.accountid == x.get('acc_id'), cols_b].drop_duplicates()
        ),
        'previous_products': self.fetch_products_id(
            x.get('previous_commit'),
            df.loc[df.accountid == x.get('acc_id'), cols_b].drop_duplicates()
        ),
        'previous_recurrent_products': self.fetch_products_id(
            x.get('previous_commit'),
            df.loc[(df.accountid == x.get('acc_id')) amp; (df.fee_type == 'Recurring'), cols_b].drop_duplicates()
        )
    }
    for x in tqdm(
        commits,
        desc='Finding previous commit products',
        file=sys.stdout
    )
]

# Pick new calculated column and change its name for compatibility:
df = pd.DataFrame(commits).join(pd.DataFrame(products).set_index('current_oppy'), on='current_oppy')
df = df.loc[:, ['current_oppy', 'previous_commit', 'current_products', 'previous_recurrent_products']]
df.columns = ['current_oppy', 'previous_commit', 'current_products', 'previous_products']
return df

@staticmethod
def fetch_latest_commit(oppy_id: str, data: pd.DataFrame):
# Build some filters and create a df copy to search against:
data = data.set_index('opportunityid')
current_closedate = data.loc[data.index == oppy_id, ['closedate']].iat[0, 0]
current_platform = data.loc[data.index == oppy_id, ['platform']].iat[0, 0]
date_filter = data.closedate < current_closedate
platform_filter = data.platform == current_platform
eb_filter = data.opportunity_type != 'EB'
subset = data.loc[date_filter amp; eb_filter, :].sort_values('closedate', ascending=False)

if current_platform in {'CL', 'WE'}:
    # Fetch latest commit closedate for the same platform:
    subset = data.loc[date_filter amp; platform_filter amp; eb_filter, :].sort_values('closedate', ascending=False)
    latest_commit_date = subset.loc[:, 'closedate'].max()
    latest_commit_filter = subset.closedate == latest_commit_date
else:
    # Fetch latest commit closedate:
    latest_commit_date = subset.loc[:, 'closedate'].max()
    latest_commit_filter = subset.closedate == latest_commit_date

# Now try to get the latest commit oppy_id (if available), otherwise, just exit the function
# and return the current oppy_id. If the latest commit is a NB or NBU
# deal, then make another lookup to ensure that all the NB info is gathered since they might
# have different closedates.

try:
    latest_commit_id = list(subset.loc[latest_commit_filter, :].index)
    latest_commitid_filter = subset.index.isin(latest_commit_id)
    latest_commit_type = subset.loc[latest_commitid_filter, 'opportunity_type'].unique()[0]
except IndexError:
    return {oppy_id}

if latest_commit_type == 'RN':
    return set(latest_commit_id)
else:
    try:
        nb_before_latest_commit_filter = subset.closedate < latest_commit_date
        nb_only_filter = subset.opportunity_type == 'NB'
        nb_commit_id = list(subset.loc[nb_only_filter amp; nb_before_latest_commit_filter, :].index)
        return set(latest_commit_id   nb_commit_id)
    except IndexError:
        return set(latest_commit_id)

@staticmethod
def fetch_products_id(oppy_ids: list, data: pd.DataFrame):
    data = data.set_index('opportunityid')
    return set(data.loc[data.index.isin(oppy_ids), 'productid'])
 

Комментарии:

1. У вас есть набор данных о продажах с Client_ID, order_ID и датой? И вы хотите найти предыдущий порядок для и Client_ID ?

2. Я не уверен, работаете ли вы в распределенной вычислительной системе, но если да, и вы работаете с pandas, следствием этого является то, что у вас не будет распараллеливания. Что означает низкую производительность

3. Означает ли это, что, возможно, стоит попробовать PySpark? Как это будет работать?

4. Вы должны проверить dask

Ответ №1:

«Очень простыми словами, Pandas выполняет операции на одной машине, в то время как PySpark работает на нескольких машинах. Если вы работаете над приложением машинного обучения, где имеете дело с большими наборами данных, лучше всего подходит PySpark, который может обрабатывать операции во много раз (в 100 раз) быстрее, чем Pandas. »

от https://sparkbyexamples.com/pyspark/pandas-vs-pyspark-dataframe-with-examples /

Вы также должны подумать о функциональном подходе Windows, чтобы получить предыдущий порядок. Это позволит избежать зацикливания на всех записях.