#python #dask #dask-dataframe
Вопрос:
У меня есть большой набор данных в виде следующего фрейма данных, который я ранее загрузил из файлов avro
отметка времени | ID | Категория | ценность |
---|---|---|---|
2021-01-01 00:00:00 00:00 | a | d | g |
2021-01-01 00:10:00 00:00 | a | d | h |
2021-01-01 00:10:00 00:00 | a | e | h |
2021-01-01 00:00:00 00:00 | b | e | h |
Я хотел бы повернуть category
столбец (который содержит порядка 50 различных категорий) и выполнить дедупликацию по столбцам timestamp
и id
, чтобы результат выглядел так
ID | отметка времени | d | e |
---|---|---|---|
a | 2021-01-01 00:00:00 00:00 | g | nan |
a | 2021-01-01 00:10:00 00:00 | h | h |
b | 2021-01-01 00:00:00 00:00 | nan | h |
Я знаю, как бы я достиг этого при pandas
использовании многоиндикаторов вместе с stack
unstack
операциями/, однако мой набор данных слишком велик для использования pandas
без ручной пакетной обработки и dask
не поддерживает многоиндикаторы. Есть ли какой-то способ сделать это эффективно ? dask
Редактировать:
Как отметил @Dahn, я создал минимальный синтетический пример с пандами:
import pandas as pd
records = [
{'idx': 0, 'id': 'a', 'category': 'd', 'value': 1},
{'idx': 1, 'id': 'a', 'category': 'e', 'value': 2},
{'idx': 2, 'id': 'a', 'category': 'f', 'value': 3},
{'idx': 0, 'id': 'b', 'category': 'd', 'value': 4},
{'idx': 1, 'id': 'c', 'category': 'e', 'value': 5},
{'idx': 2, 'id': 'c', 'category': 'f', 'value': 6}
]
frame = pd.DataFrame(records)
idx id category value
0 0 a d 1
1 1 a e 2
2 2 a f 3
3 0 b d 4
4 1 c e 5
5 2 c f 6
frame = frame.set_index(['id', 'idx', 'category'], drop=True).unstack().droplevel(0, axis=1).reset_index()
frame.columns.name = ''
id idx d e f
0 a 0 1.0 NaN NaN
1 a 1 NaN 2.0 NaN
2 a 2 NaN NaN 3.0
3 b 0 4.0 NaN NaN
4 c 1 NaN 5.0 NaN
5 c 2 NaN NaN 6.0
Комментарии:
1. Я действительно не понимаю вопроса. Я не вижу
d
e
, откуда берутся столбцы и, но, возможно, упускаю что-то очевидное.2.@Полномочия-это значения
category
столбца, которые поворачиваются, чтобы быть столбцами. Подумайтеpandas'
unstack
об операции, применяемой к мультииндексу. pandas.pydata.org/pandas-docs/stable/reference/api/…3. ИМО это помогло бы ответить на вопрос, если бы вы предоставили примеры данных в машиночитаемой форме и реализацию Pandas.
4. @Dahn Я добавил пример того, как выполнить такую операцию поворота с
pandas
помощью .
Ответ №1:
Я не верю, что Dask реализует это с октября 2021 года. Вероятно, это связано с тем, что нет поддержки многоиндексности, которая unstack
требуется. Однако в последнее время над этим была проделана определенная работа.
Тем не менее, я думаю, что это все еще должно быть возможно с использованием парадигмы «применить-объединить-применить» (и apply_concat_apply
функции).
Приведенное ниже решение работает для приведенного вами примера, и в принципе, я думаю, оно должно работать в целом, но я не уверен. Пожалуйста, действуйте с осторожностью и, если возможно, убедитесь, что результаты согласуются с тем, что дает вам Панды. Я также разместил это в качестве запроса на функцию на самом github Dask.
import dask.dataframe as dd
# Create Dask DataFrame out of your `frame`
# npartitions is more than 1 to demonstrate this works on a partitioned datataset
df = dd.from_pandas(frame, npartitions=3)
# Dask needs to know work out what the categories are
# Alternatively you can use df.categorize
# See https://docs.dask.org/en/latest/dataframe-categoricals.html
category = 'category'
df[category] = df[category].astype(category).cat.as_known()
# Dask needs to know what the resulting DataFrame looks like
new_columns = pd.CategoricalIndex(df[category].cat.categories, name=category)
meta = pd.DataFrame(columns=new_columns,
index=df._meta.set_index(['idx', 'id']).index)
# Implement using apply_concat_apply ("aca")
# More details: https://blog.dask.org/2019/10/08/df-groupby
def identity(x): return x
def my_unstack(x):
return x.set_index(['id', 'idx', 'category'], drop=True).unstack()
def combine(x):
return x.groupby(level=[0, 1]).sum()
result = dd.core.apply_concat_apply([df],
chunk=identity,
aggregate=my_unstack,
combine=combine,
meta=meta)
result.compute()
Вариант В: map_partitions
Если вы уже можете сортировать данные по крайней мере по одному из idx
или id
, то вы также можете просто использовать map_partitions
и обрабатывать каждый раздел как фрейм данных Pandas.
Это должно привести к значительному улучшению использования памяти и производительности в целом.
# df has sorted index `idx` in this scenario
category = 'category'
existing_categories = df[category].astype(category).cat.as_known().cat.categories
categories = [('value', cat) for cat in existing_categories]
new_columns = pd.MultiIndex.from_tuples(categories, names=(None, category))
meta = pd.DataFrame(columns=new_columns,
index=df._meta.set_index(['idx', 'id']).index)
def unstack_add_columns(x):
x = x.set_index(['id', 'category'], append=True, drop=True).unstack()
# make sure that result contains all necessary columns
return x.reindex(columns=new_columns)
df.map_partitions(unstack_add_columns, meta=meta)
Если вы не можете гарантировать, что idx будет отсортирован, вы можете попробовать что-то вроде
df_sorted = df.set_index('idx')
# I recommend saving to disk in between set_index and the rest
df_sorted.to_parquet('data-sorted.parq')
но это само по себе может привести к проблемам с памятью.
Комментарии:
1. Я не знал об этой парадигме, спасибо! Я проверю это с разумным количеством данных и приму их, как только буду уверен, что это работает.
2. Спасибо, это, безусловно, поможет придать некоторую достоверность этому ответу.
3. На небольшом подмножестве данных это, кажется, работает идеально, хотя на больших подмножествах я всегда сталкиваюсь с проблемами нехватки памяти. Я пробовал играть с размером блока при загрузке данных из avro, но это не очень помогло. Считаете ли вы, что перераспределение может помочь перед выполнением этого шага поворота?
4. @sobek
compute
вычислит весь фрейм данных и вернет результат вычисления в вашу локальную память. Для больших наборов данных вы, вероятно, захотите вместо этого снова сохранить их на диске или обработать данные в более мелкой форме.5. Да, каждая из функций в
apply_concat_apply
работает с кадрами данных Pandas
Ответ №2:
В качестве дополнения к ответу Дана, чтобы вернуться к неиндексированному многоуровневому фрейму, я сделал следующее:
meta = pd.DataFrame(
columns=['level_0', 'idx', 'id'] [x for x in existing_categories],
index=df._meta.reset_index().index
)
def reset_index(x):
x = x.droplevel(0, axis=1)
x.columns.name = None
return x.reset_index()
df = df.map_partitions(reset_index, meta=meta).drop('level_0', axis=1)
Возможно, есть более элегантное решение для достижения этой цели, но оно работает для меня.