#python #dask
Вопрос:
У меня есть код панды, который я хотел бы перевести на Dask
Давайте возьмем фиктивные данные
import dask.dataframe as dd
df = pd.DataFrame({'item_id': [10, 10, 10, 8, 8, 8], 'rating': [3, 4, 2, 1, 2, 3]})
ddf = dd.from_pandas(df, npartitions=2)
вот код панды
bb = df[['item_id', 'rating']].
groupby(['item_id']).agg(
item_hist_rating_up=pd.NamedAgg(column='rating', aggfunc=lambda x: round(100 * sum(x>=3.75) / len(x))),
item_hist_rating_down=pd.NamedAgg(column='rating', aggfunc=lambda x: round(100 * sum(x<3.75) / len(x))),
item_hist_rating_q25=pd.NamedAgg(column='rating', aggfunc=lambda x: np.quantile(x, q = 0.25 )),
item_hist_rating_q75=pd.NamedAgg(column='rating', aggfunc=lambda x: np.quantile(x, q = 0.75 )),
item_hist_rating_min=pd.NamedAgg(column='rating', aggfunc='min'),
item_hist_rating_count=pd.NamedAgg(column='rating', aggfunc='count'),
item_hist_rating_max=pd.NamedAgg(column='rating', aggfunc='max'),
item_hist_rating_avg=pd.NamedAgg(column='rating', aggfunc=np.mean),
).reset_index().round(2)
bb
Я знаю, что с помощью Dask можно вычислить четыре из этих чисел следующим образом
ddf.groupby(['item_id'])['rating'].aggregate(['sum', 'mean', 'max', 'min']).compute()
и еще два таких же
ddf['rating'].ge(3.75).groupby(ddf['item_id']).mean().compute()
ddf['rating'].lt(3.75).groupby(ddf['item_id']).mean().compute()
но я не мог понять 1) как сделать групповую работу.квантиль — ни то, ни другое 2) как объединить эти результаты ?
Ответ №1:
import numpy as np
import pandas as pd
import dask.dataframe as dd
stats_df = ddf.groupby(['item_id'])['rating'].aggregate(['sum', 'mean', 'max', 'min', 'count'])
stats_df['rating_up'] = ddf['rating'].ge(3.75).groupby(ddf['item_id']).mean() * 100
stats_df['rating_down'] = ddf['rating'].lt(3.75).groupby(ddf['item_id']).mean() * 100
q25 = ddf.groupby('item_id')['rating'].apply(
lambda x: x.quantile(0.25))
q75 = ddf.groupby('item_id')['rating'].apply(
lambda x: x.quantile(0.75))
qdf = dd.merge(stats_df, q25, left_index=True, right_index=True)
edf = dd.merge(stats_df, q75, left_index=True, right_index=True)
ldf = dd.merge(qdf, edf[['rating']], left_index=True, right_index=True)
ldf.columns = ['sum', 'mean', 'max', 'min', 'count', 'rating_up', 'rating_down', 'q25', 'q75']
ldf.compute().reset_index().round(2)
Выход:
Out[24]:
item_id sum mean max min count rating_up rating_down q25 q75
0 8 6 2.0 3 1 3 0.00 100.00 1.5 2.5
1 10 9 3.0 4 2 3 33.33 66.67 2.5 3.5
В качестве альтернативы используйте dask.delayed для параллельного вычисления команд pandas. Согласно их документации, параллелизм достигается за счет большого количества отложенных вызовов. Таким образом, ваша агрегация разбита на несколько функций с задержкой ниже.
from dask import delayed
@delayed
def rating_up(x):
return x.groupby(['item_id']).agg(
rating_up=pd.NamedAgg(column='rating', aggfunc=lambda x: round(100 * sum(x>=3.75) / len(x), 2)))
@delayed
def rating_down(x):
return x.groupby(['item_id']).agg(
rating_down=pd.NamedAgg(column='rating', aggfunc=lambda x: round(100 * sum(x<3.75) / len(x), 2)))
@delayed
def q_25(x):
return x.groupby(['item_id']).agg(
rating_q25=pd.NamedAgg(column='rating', aggfunc=lambda x: np.quantile(x, q=0.25 )))
@delayed
def q_75(x):
return x.groupby(['item_id']).agg(
rating_q75=pd.NamedAgg(column='rating', aggfunc=lambda x: np.quantile(x, q=0.75 )))
@delayed
def rating_min(x):
return x.groupby(['item_id']).agg(
rating_min=pd.NamedAgg(column='rating', aggfunc='min'))
@delayed
def rating_max(x):
return x.groupby(['item_id']).agg(
rating_max=pd.NamedAgg(column='rating', aggfunc='max'))
@delayed
def rating_count(x):
return x.groupby(['item_id']).agg(
rating_count=pd.NamedAgg(column='rating', aggfunc='count'))
@delayed
def rating_avg(x):
return x.groupby(['item_id']).agg(
rating_avg=pd.NamedAgg(column='rating', aggfunc=np.mean))
def stats(x):
count_ = rating_count(x)
up = rating_up(x)
down = rating_down(x)
q25 = q_25(x)
q75 = q_75(x)
rate_min = rating_min(x)
rate_max = rating_max(x)
rate_avg = rating_avg(x)
ddf = count_.join(up).join(down).join(q25).join(q75).join(rate_min).
join(rate_max).join(rate_avg)
return ddf
stats_df = stats(df)
print(stats_df.compute().reset_index().round(2))
Выход:
item_id rating_count rating_up rating_down rating_q25 rating_q75 rating_min rating_max rating_avg
0 8 3 0.00 100.00 1.5 2.5 1 3 2
1 10 3 33.33 66.67 2.5 3.5 2 4 3
Комментарии:
1. Спасибо — можно ли использовать агрегатную функцию ? и избежать слияния ? (Я просто думаю о производительности — мои данные огромны) ? тебе не кажется, что это было бы умнее ?
2. Согласно документации Dask, объединение индексов происходит быстро. docs.dask.org/en/latest/dataframe-best-practices.html
3. Я пытался сделать что-то вроде — > ddf.groupby(‘item_id’).agg ({«рейтинг»: [«мин», «макс», «количество», «среднее», custom1, custom2, custom3]}) — это то, что вы можете добавить к своему ответу ? Я уверен, что будущие посетители по достоинству оценят это, так как подобных примеров не так много.
4. Не без того, чтобы потратить часы/дни, пытаясь сделать это. Квантильный метод Даска ведет себя довольно странно.
5. Я тоже ничего не понимаю. Я думаю, что часть фрейма данных dask выглядит немного странно из-за нескольких слияний. Однако настоящее решение-единственный способ, которым я мог бы заставить фрейм данных dask работать с вычислением квантиля. Все остальное приводило к ошибкам, и мне потребовалось добрых два часа, чтобы пройти их. Я собираюсь поднять этот вопрос как проблему с участниками Dask. Кстати, если ваши сводные статистические данные не слишком велики, вы можете вызывать метод вычисления после каждого вычисления статистики с помощью Dask, а затем объединять их с помощью pandas. 1 от меня также за хорошо сформулированный вопрос.