Преобразуйте столбец категориальных данных в дополнительные столбцы

#python #dask #dask-dataframe

Вопрос:

У меня есть большой набор данных в виде следующего фрейма данных, который я ранее загрузил из файлов avro

отметка времени ID Категория ценность
2021-01-01 00:00:00 00:00 a d g
2021-01-01 00:10:00 00:00 a d h
2021-01-01 00:10:00 00:00 a e h
2021-01-01 00:00:00 00:00 b e h

Я хотел бы повернуть category столбец (который содержит порядка 50 различных категорий) и выполнить дедупликацию по столбцам timestamp и id , чтобы результат выглядел так

ID отметка времени d e
a 2021-01-01 00:00:00 00:00 g nan
a 2021-01-01 00:10:00 00:00 h h
b 2021-01-01 00:00:00 00:00 nan h

Я знаю, как бы я достиг этого при pandas использовании многоиндикаторов вместе с stack unstack операциями/, однако мой набор данных слишком велик для использования pandas без ручной пакетной обработки и dask не поддерживает многоиндикаторы. Есть ли какой-то способ сделать это эффективно ? dask

Редактировать:

Как отметил @Dahn, я создал минимальный синтетический пример с пандами:

 
import pandas as pd

records = [
    {'idx': 0, 'id': 'a', 'category': 'd', 'value': 1},
    {'idx': 1, 'id': 'a', 'category': 'e', 'value': 2},
    {'idx': 2, 'id': 'a', 'category': 'f', 'value': 3},
    {'idx': 0, 'id': 'b', 'category': 'd', 'value': 4},
    {'idx': 1, 'id': 'c', 'category': 'e', 'value': 5},
    {'idx': 2, 'id': 'c', 'category': 'f', 'value': 6}
]

frame = pd.DataFrame(records)
 
    idx id category  value
0    0  a        d      1
1    1  a        e      2
2    2  a        f      3
3    0  b        d      4
4    1  c        e      5
5    2  c        f      6
 
 frame = frame.set_index(['id', 'idx', 'category'], drop=True).unstack().droplevel(0, axis=1).reset_index()
frame.columns.name = ''
 
   id  idx    d    e    f
0  a    0  1.0  NaN  NaN
1  a    1  NaN  2.0  NaN
2  a    2  NaN  NaN  3.0
3  b    0  4.0  NaN  NaN
4  c    1  NaN  5.0  NaN
5  c    2  NaN  NaN  6.0


 

Комментарии:

1. Я действительно не понимаю вопроса. Я не вижу d e , откуда берутся столбцы и, но, возможно, упускаю что-то очевидное.

2.@Полномочия-это значения category столбца, которые поворачиваются, чтобы быть столбцами. Подумайте pandas' unstack об операции, применяемой к мультииндексу. pandas.pydata.org/pandas-docs/stable/reference/api/…

3. ИМО это помогло бы ответить на вопрос, если бы вы предоставили примеры данных в машиночитаемой форме и реализацию Pandas.

4. @Dahn Я добавил пример того, как выполнить такую операцию поворота с pandas помощью .

Ответ №1:

Я не верю, что Dask реализует это с октября 2021 года. Вероятно, это связано с тем, что нет поддержки многоиндексности, которая unstack требуется. Однако в последнее время над этим была проделана определенная работа.

Тем не менее, я думаю, что это все еще должно быть возможно с использованием парадигмы «применить-объединить-применить» (и apply_concat_apply функции).

Приведенное ниже решение работает для приведенного вами примера, и в принципе, я думаю, оно должно работать в целом, но я не уверен. Пожалуйста, действуйте с осторожностью и, если возможно, убедитесь, что результаты согласуются с тем, что дает вам Панды. Я также разместил это в качестве запроса на функцию на самом github Dask.

 import dask.dataframe as dd

# Create Dask DataFrame out of your `frame`
# npartitions is more than 1 to demonstrate this works on a partitioned datataset
df = dd.from_pandas(frame, npartitions=3)

# Dask needs to know work out what the categories are
# Alternatively you can use df.categorize
# See https://docs.dask.org/en/latest/dataframe-categoricals.html
category = 'category'
df[category] = df[category].astype(category).cat.as_known()

# Dask needs to know what the resulting DataFrame looks like
new_columns = pd.CategoricalIndex(df[category].cat.categories, name=category)
meta = pd.DataFrame(columns=new_columns, 
                    index=df._meta.set_index(['idx', 'id']).index)

# Implement using apply_concat_apply ("aca")
# More details: https://blog.dask.org/2019/10/08/df-groupby
def identity(x): return x

def my_unstack(x):
    return x.set_index(['id', 'idx', 'category'], drop=True).unstack()
    
def combine(x):
    return x.groupby(level=[0, 1]).sum()

result = dd.core.apply_concat_apply([df], 
                   chunk=identity, 
                   aggregate=my_unstack, 
                   combine=combine,
                   meta=meta)

result.compute()
 

Вариант В: map_partitions

Если вы уже можете сортировать данные по крайней мере по одному из idx или id , то вы также можете просто использовать map_partitions и обрабатывать каждый раздел как фрейм данных Pandas.

Это должно привести к значительному улучшению использования памяти и производительности в целом.

 # df has sorted index `idx` in this scenario

category = 'category'
existing_categories = df[category].astype(category).cat.as_known().cat.categories
categories = [('value', cat) for cat in existing_categories]

new_columns = pd.MultiIndex.from_tuples(categories, names=(None, category))

meta = pd.DataFrame(columns=new_columns, 
                    index=df._meta.set_index(['idx', 'id']).index)

def unstack_add_columns(x):
    x = x.set_index(['id', 'category'], append=True, drop=True).unstack()
    # make sure that result contains all necessary columns
    return x.reindex(columns=new_columns) 

df.map_partitions(unstack_add_columns, meta=meta)
 

Если вы не можете гарантировать, что idx будет отсортирован, вы можете попробовать что-то вроде

 df_sorted = df.set_index('idx')
# I recommend saving to disk in between set_index and the rest
df_sorted.to_parquet('data-sorted.parq')
 

но это само по себе может привести к проблемам с памятью.

Комментарии:

1. Я не знал об этой парадигме, спасибо! Я проверю это с разумным количеством данных и приму их, как только буду уверен, что это работает.

2. Спасибо, это, безусловно, поможет придать некоторую достоверность этому ответу.

3. На небольшом подмножестве данных это, кажется, работает идеально, хотя на больших подмножествах я всегда сталкиваюсь с проблемами нехватки памяти. Я пробовал играть с размером блока при загрузке данных из avro, но это не очень помогло. Считаете ли вы, что перераспределение может помочь перед выполнением этого шага поворота?

4. @sobek compute вычислит весь фрейм данных и вернет результат вычисления в вашу локальную память. Для больших наборов данных вы, вероятно, захотите вместо этого снова сохранить их на диске или обработать данные в более мелкой форме.

5. Да, каждая из функций в apply_concat_apply работает с кадрами данных Pandas

Ответ №2:

В качестве дополнения к ответу Дана, чтобы вернуться к неиндексированному многоуровневому фрейму, я сделал следующее:

 meta = pd.DataFrame(
        columns=['level_0', 'idx', 'id']   [x for x in existing_categories],
        index=df._meta.reset_index().index
    )

def reset_index(x):
    x = x.droplevel(0, axis=1)
    x.columns.name = None
    return x.reset_index()

df = df.map_partitions(reset_index, meta=meta).drop('level_0', axis=1)
 

Возможно, есть более элегантное решение для достижения этой цели, но оно работает для меня.