группы панд объединяются в даск

#python #dask

Вопрос:

У меня есть код панды, который я хотел бы перевести на Dask

Давайте возьмем фиктивные данные

 import dask.dataframe as dd
df = pd.DataFrame({'item_id': [10, 10, 10, 8, 8, 8], 'rating': [3, 4, 2, 1, 2, 3]})
ddf = dd.from_pandas(df, npartitions=2)
 

вот код панды

 bb = df[['item_id', 'rating']].
        groupby(['item_id']).agg(
        item_hist_rating_up=pd.NamedAgg(column='rating', aggfunc=lambda x: round(100 * sum(x>=3.75) / len(x))),
        item_hist_rating_down=pd.NamedAgg(column='rating', aggfunc=lambda x: round(100 * sum(x<3.75) / len(x))),
        item_hist_rating_q25=pd.NamedAgg(column='rating', aggfunc=lambda x: np.quantile(x, q = 0.25 )),
        item_hist_rating_q75=pd.NamedAgg(column='rating', aggfunc=lambda x: np.quantile(x, q = 0.75 )),
        item_hist_rating_min=pd.NamedAgg(column='rating', aggfunc='min'),
        item_hist_rating_count=pd.NamedAgg(column='rating', aggfunc='count'),
        item_hist_rating_max=pd.NamedAgg(column='rating', aggfunc='max'),
        item_hist_rating_avg=pd.NamedAgg(column='rating', aggfunc=np.mean),
    ).reset_index().round(2)
bb
 

Я знаю, что с помощью Dask можно вычислить четыре из этих чисел следующим образом

 ddf.groupby(['item_id'])['rating'].aggregate(['sum', 'mean', 'max', 'min']).compute()
 

и еще два таких же

 ddf['rating'].ge(3.75).groupby(ddf['item_id']).mean().compute()
ddf['rating'].lt(3.75).groupby(ddf['item_id']).mean().compute()
 

но я не мог понять 1) как сделать групповую работу.квантиль — ни то, ни другое 2) как объединить эти результаты ?

Ответ №1:

 import numpy as np
import pandas as pd
import dask.dataframe as dd

stats_df = ddf.groupby(['item_id'])['rating'].aggregate(['sum', 'mean', 'max', 'min', 'count'])
stats_df['rating_up'] = ddf['rating'].ge(3.75).groupby(ddf['item_id']).mean() * 100
stats_df['rating_down'] = ddf['rating'].lt(3.75).groupby(ddf['item_id']).mean() * 100

q25 = ddf.groupby('item_id')['rating'].apply(
    lambda x: x.quantile(0.25))

q75 = ddf.groupby('item_id')['rating'].apply(
    lambda x: x.quantile(0.75))
  
qdf = dd.merge(stats_df, q25, left_index=True, right_index=True)
edf = dd.merge(stats_df, q75, left_index=True, right_index=True)
ldf = dd.merge(qdf, edf[['rating']], left_index=True, right_index=True)
ldf.columns = ['sum', 'mean', 'max', 'min', 'count', 'rating_up', 'rating_down', 'q25', 'q75']
ldf.compute().reset_index().round(2)
 

Выход:

 Out[24]: 
   item_id  sum  mean  max  min  count  rating_up  rating_down  q25  q75
0        8    6   2.0    3    1      3       0.00       100.00  1.5  2.5
1       10    9   3.0    4    2      3      33.33        66.67  2.5  3.5
 

В качестве альтернативы используйте dask.delayed для параллельного вычисления команд pandas. Согласно их документации, параллелизм достигается за счет большого количества отложенных вызовов. Таким образом, ваша агрегация разбита на несколько функций с задержкой ниже.

 from dask import delayed

@delayed
def rating_up(x):
    return x.groupby(['item_id']).agg(
        rating_up=pd.NamedAgg(column='rating', aggfunc=lambda x: round(100 * sum(x>=3.75) / len(x), 2)))

@delayed
def rating_down(x):
    return x.groupby(['item_id']).agg(
        rating_down=pd.NamedAgg(column='rating', aggfunc=lambda x: round(100 * sum(x<3.75) / len(x), 2)))

@delayed
def q_25(x):
    return x.groupby(['item_id']).agg(
        rating_q25=pd.NamedAgg(column='rating', aggfunc=lambda x: np.quantile(x, q=0.25 )))

@delayed
def q_75(x):
    return x.groupby(['item_id']).agg(
        rating_q75=pd.NamedAgg(column='rating', aggfunc=lambda x: np.quantile(x, q=0.75 )))

@delayed
def rating_min(x):
    return x.groupby(['item_id']).agg(
        rating_min=pd.NamedAgg(column='rating', aggfunc='min'))

@delayed
def rating_max(x):
    return x.groupby(['item_id']).agg(
        rating_max=pd.NamedAgg(column='rating', aggfunc='max'))

@delayed
def rating_count(x):
    return x.groupby(['item_id']).agg(
        rating_count=pd.NamedAgg(column='rating', aggfunc='count'))

@delayed
def rating_avg(x):
    return x.groupby(['item_id']).agg(
        rating_avg=pd.NamedAgg(column='rating', aggfunc=np.mean))


def stats(x):
    count_ = rating_count(x)
    up = rating_up(x)
    down = rating_down(x)
    q25 = q_25(x)
    q75 = q_75(x)
    rate_min = rating_min(x)
    rate_max = rating_max(x)
    rate_avg = rating_avg(x)
    ddf = count_.join(up).join(down).join(q25).join(q75).join(rate_min).
        join(rate_max).join(rate_avg)
    return ddf

stats_df = stats(df)
print(stats_df.compute().reset_index().round(2))
 

Выход:

    item_id  rating_count  rating_up  rating_down  rating_q25  rating_q75  rating_min  rating_max  rating_avg
0        8             3       0.00       100.00         1.5         2.5           1           3           2
1       10             3      33.33        66.67         2.5         3.5           2           4           3
 

Комментарии:

1. Спасибо — можно ли использовать агрегатную функцию ? и избежать слияния ? (Я просто думаю о производительности — мои данные огромны) ? тебе не кажется, что это было бы умнее ?

2. Согласно документации Dask, объединение индексов происходит быстро. docs.dask.org/en/latest/dataframe-best-practices.html

3. Я пытался сделать что-то вроде — > ddf.groupby(‘item_id’).agg ({«рейтинг»: [«мин», «макс», «количество», «среднее», custom1, custom2, custom3]}) — это то, что вы можете добавить к своему ответу ? Я уверен, что будущие посетители по достоинству оценят это, так как подобных примеров не так много.

4. Не без того, чтобы потратить часы/дни, пытаясь сделать это. Квантильный метод Даска ведет себя довольно странно.

5. Я тоже ничего не понимаю. Я думаю, что часть фрейма данных dask выглядит немного странно из-за нескольких слияний. Однако настоящее решение-единственный способ, которым я мог бы заставить фрейм данных dask работать с вычислением квантиля. Все остальное приводило к ошибкам, и мне потребовалось добрых два часа, чтобы пройти их. Я собираюсь поднять этот вопрос как проблему с участниками Dask. Кстати, если ваши сводные статистические данные не слишком велики, вы можете вызывать метод вычисления после каждого вычисления статистики с помощью Dask, а затем объединять их с помощью pandas. 1 от меня также за хорошо сформулированный вопрос.