Как выполнить фильтр groupby в Dask

#dask

#dask

Вопрос:

Я пытаюсь взять фрейм данных dask, сгруппировать по столбцу ‘A’ и удалить группы, в которых меньше строк MIN_SAMPLE_COUNT.

Например, следующий код работает в pandas:

 import pandas as pd
import dask as da

MIN_SAMPLE_COUNT = 1

x = pd.DataFrame([[1,2,3], [1,5,6], [2,8,9], [1,3,5]])
x.columns = ['A', 'B', 'C']

grouped = x.groupby('A')
x = grouped.filter(lambda x: x['A'].count().astype(int) > MIN_SAMPLE_COUNT)
  

Однако, в Dask, если я попробую что-то аналогичное:

 import pandas as pd
import dask

MIN_SAMPLE_COUNT = 1

x = pd.DataFrame([[1,2,3], [1,5,6], [2,8,9], [1,3,5]])
x.columns = ['A', 'B', 'C']

x = dask.dataframe.from_pandas(x, npartitions=2)

grouped = x.groupby('A')
x = grouped.filter(lambda x: x['A'].count().astype(int) > MIN_SAMPLE_COUNT)
  

Я получаю следующее сообщение об ошибке:

 ---------------------------------------------------------------------------
KeyError                                  Traceback (most recent call last)
~AppDataLocalContinuumanaconda3libsite-packagesdaskdataframegroupby.py in __getattr__(self, key)
   1162         try:
-> 1163             return self[key]
   1164         except KeyError as e:

~AppDataLocalContinuumanaconda3libsite-packagesdaskdataframegroupby.py in __getitem__(self, key)
   1153         # error is raised from pandas
-> 1154         g._meta = g._meta[key]
   1155         return g

~AppDataLocalContinuumanaconda3libsite-packagespandascorebase.py in __getitem__(self, key)
    274             if key not in self.obj:
--> 275                 raise KeyError("Column not found: {key}".format(key=key))
    276             return self._gotitem(key, ndim=1)

KeyError: 'Column not found: filter'

During handling of the above exception, another exception occurred:

AttributeError                            Traceback (most recent call last)
<ipython-input-55-d8a969cc041b> in <module>()
      1 # Remove sixty second blocks that have fewer than MIN_SAMPLE_COUNT samples.
      2 grouped = dat.groupby('KPI_60_seconds')
----> 3 dat = grouped.filter(lambda x: x['KPI_60_seconds'].count().astype(int) > MIN_SAMPLE_COUNT)

~AppDataLocalContinuumanaconda3libsite-packagesdaskdataframegroupby.py in __getattr__(self, key)
   1163             return self[key]
   1164         except KeyError as e:
-> 1165             raise AttributeError(e)
   1166 
   1167     @derived_from(pd.core.groupby.DataFrameGroupBy)

AttributeError: 'Column not found: filter'
  

Сообщение об ошибке предполагает, что метод фильтрации, используемый в Pandas, не был реализован в Dask (и я не нашел его после поиска).

Существует ли функциональность Dask, которая отражает то, что я хочу сделать? Я прошел через Dask API, и ничто не выделилось для меня как то, что мне нужно. В настоящее время я использую Dask ‘1.1.1’

Спасибо за вашу помощь.

Ответ №1:

Я сам довольно новичок в Dask. Один из способов достижения, который вы пытаетесь достичь, может быть следующим:

Версия Dask: 0.17.3

 import pandas as pd
import dask.dataframe as dd

MIN_SAMPLE_COUNT = 1

x = pd.DataFrame([[1,2,3], [1,5,6], [2,8,9], [1,3,5]])
x.columns = ['A', 'B', 'C']
print("x (before):")
print(x)  # still pandas
x = dd.from_pandas(x, npartitions=2)

grouped = x.groupby('A').B.count().reset_index()

grouped = grouped.rename(columns={'B': 'Count'})

y = dd.merge(x, grouped, on=['A'])
y = y[y.Count > MIN_SAMPLE_COUNT]
x = y[['A', 'B', 'C']]
print("x (after):")
print(x.compute())  # needs compute for conversion to pandas df
  

Вывод:

 x (before):
   A  B  C
0  1  2  3
1  1  5  6
2  2  8  9
3  1  3  5
x (after):
   A  B  C
0  1  2  3
1  1  5  6
1  1  3  5