#pivot-table #dask
#сводная таблица #dask
Вопрос:
У меня есть таблица обновлений цен в формате (временная метка, цена, сумма). Временная метка представляет собой datetime, ценовую категорию и плавающую сумму64. Столбец метки времени задается как индекс. Моя цель — получить сумму, доступную при каждом уровне цен в каждый момент времени. Сначала я использую сводную таблицу для распределения цен по столбцам, а затем пересылаю заполнение.
pivot = price_table.pivot_table(index = 'timestamp',
columns = 'price', values = 'amount')
pivot_ffill = pivot.fillna(method = 'ffill')
Я могу compute
применить head
к pivot_ffill
, и это работает нормально.
Очевидно, что в начале таблицы все еще есть NAS, где еще не было обновлений.
Когда я применяю
pivot_nullfill = pivot_ffill.fillna(0)
pivot_nullfill.head()
Я получаю ошибку
The columns in the computed data do not match the columns in the provided metadata
. Я попытался заменить ноль на 0.0
или float(0)
, но безрезультатно. Поскольку предыдущие шаги выполняются, я сильно подозреваю, что это как-то связано с fillna
, но из-за задержки вычислений это не обязательно должно быть правдой.
Кто-нибудь знает, что вызывает это? Спасибо!
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
<ipython-input-180-f8ab344c7939> in <module>
----> 1 pivot_ffill.fillna(0).head()
C:ProgramDataAnaconda3envspython36libsite-packagesdaskdataframecore.py in head(self, n, npartitions, compute)
896
897 if compute:
--> 898 result = result.compute()
899 return result
900
C:ProgramDataAnaconda3envspython36libsite-packagesdaskbase.py in compute(self, **kwargs)
154 dask.base.compute
155 """
--> 156 (result,) = compute(self, traverse=False, **kwargs)
157 return result
158
C:ProgramDataAnaconda3envspython36libsite-packagesdaskbase.py in compute(*args, **kwargs)
396 keys = [x.__dask_keys__() for x in collections]
397 postcomputes = [x.__dask_postcompute__() for x in collections]
--> 398 results = schedule(dsk, keys, **kwargs)
399 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
400
C:ProgramDataAnaconda3envspython36libsite-packagesdaskthreaded.py in get(dsk, result, cache, num_workers, pool, **kwargs)
74 results = get_async(pool.apply_async, len(pool._pool), dsk, result,
75 cache=cache, get_id=_thread_get_id,
---> 76 pack_exception=pack_exception, **kwargs)
77
78 # Cleanup pools associated to dead threads
C:ProgramDataAnaconda3envspython36libsite-packagesdasklocal.py in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs)
460 _execute_task(task, data) # Re-execute locally
461 else:
--> 462 raise_exception(exc, tb)
463 res, worker_id = loads(res_info)
464 state['cache'][key] = res
C:ProgramDataAnaconda3envspython36libsite-packagesdaskcompatibility.py in reraise(exc, tb)
110 if exc.__traceback__ is not tb:
111 raise exc.with_traceback(tb)
--> 112 raise exc
113
114 import pickle as cPickle
C:ProgramDataAnaconda3envspython36libsite-packagesdasklocal.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
228 try:
229 task, data = loads(task_info)
--> 230 result = _execute_task(task, data)
231 id = get_id()
232 result = dumps((result, id))
C:ProgramDataAnaconda3envspython36libsite-packagesdaskcore.py in _execute_task(arg, cache, dsk)
116 elif istask(arg):
117 func, args = arg[0], arg[1:]
--> 118 args2 = [_execute_task(a, cache) for a in args]
119 return func(*args2)
120 elif not ishashable(arg):
C:ProgramDataAnaconda3envspython36libsite-packagesdaskcore.py in <listcomp>(.0)
116 elif istask(arg):
117 func, args = arg[0], arg[1:]
--> 118 args2 = [_execute_task(a, cache) for a in args]
119 return func(*args2)
120 elif not ishashable(arg):
C:ProgramDataAnaconda3envspython36libsite-packagesdaskcore.py in _execute_task(arg, cache, dsk)
117 func, args = arg[0], arg[1:]
118 args2 = [_execute_task(a, cache) for a in args]
--> 119 return func(*args2)
120 elif not ishashable(arg):
121 return arg
C:ProgramDataAnaconda3envspython36libsite-packagesdaskoptimization.py in __call__(self, *args)
940 % (len(self.inkeys), len(args)))
941 return core.get(self.dsk, self.outkey,
--> 942 dict(zip(self.inkeys, args)))
943
944 def __reduce__(self):
C:ProgramDataAnaconda3envspython36libsite-packagesdaskcore.py in get(dsk, out, cache)
147 for key in toposort(dsk):
148 task = dsk[key]
--> 149 result = _execute_task(task, cache)
150 cache[key] = result
151 result = _execute_task(out, cache)
C:ProgramDataAnaconda3envspython36libsite-packagesdaskcore.py in _execute_task(arg, cache, dsk)
117 func, args = arg[0], arg[1:]
118 args2 = [_execute_task(a, cache) for a in args]
--> 119 return func(*args2)
120 elif not ishashable(arg):
121 return arg
C:ProgramDataAnaconda3envspython36libsite-packagesdaskcompatibility.py in apply(func, args, kwargs)
91 def apply(func, args, kwargs=None):
92 if kwargs:
---> 93 return func(*args, **kwargs)
94 else:
95 return func(*args)
C:ProgramDataAnaconda3envspython36libsite-packagesdaskdataframecore.py in apply_and_enforce(*args, **kwargs)
3800 if not np.array_equal(np.nan_to_num(meta.columns),
3801 np.nan_to_num(df.columns)):
-> 3802 raise ValueError("The columns in the computed data do not match"
3803 " the columns in the provided metadata")
3804 else:
ValueError: The columns in the computed data do not match the columns in the provided metadata
Ответ №1:
Сообщение об ошибке должно было подсказать вам, как исправить ситуацию. Мы предполагаем, что вы загружаетесь из CSV (в вопросе не указано), поэтому, вероятно, в итоге вы получите строку типа
df = dd.read_csv(..., dtype={...})
который инструктирует читателя pandas относительно dtypes, которые вы хотите применить, поскольку вы знаете больше информации, чем pandas. Это гарантирует, что все разделы будут иметь одинаковые типы для всех столбцов — см. раздел Примечания к документам.
Комментарии:
1. На самом деле я загружал фрейм данных pandas в dask с помощью
from_pandas
. У этого метода нетdtype
аргумента. Есть ли способ обойти сохранение в csv и перезагрузку?2. Запись и загрузка в CSV, похоже, теперь работает. Еще раз спасибо!