Как исправить ошибку «Столбцы в вычисленных данных не соответствуют столбцам в предоставленных метаданных»?

#pivot-table #dask

#сводная таблица #dask

Вопрос:

У меня есть таблица обновлений цен в формате (временная метка, цена, сумма). Временная метка представляет собой datetime, ценовую категорию и плавающую сумму64. Столбец метки времени задается как индекс. Моя цель — получить сумму, доступную при каждом уровне цен в каждый момент времени. Сначала я использую сводную таблицу для распределения цен по столбцам, а затем пересылаю заполнение.

 pivot = price_table.pivot_table(index = 'timestamp', 
columns = 'price', values = 'amount')
pivot_ffill = pivot.fillna(method = 'ffill')
  

Я могу compute применить head к pivot_ffill , и это работает нормально.
Очевидно, что в начале таблицы все еще есть NAS, где еще не было обновлений.
Когда я применяю

 pivot_nullfill = pivot_ffill.fillna(0)
pivot_nullfill.head()
  

Я получаю ошибку
The columns in the computed data do not match the columns in the provided metadata . Я попытался заменить ноль на 0.0 или float(0) , но безрезультатно. Поскольку предыдущие шаги выполняются, я сильно подозреваю, что это как-то связано с fillna , но из-за задержки вычислений это не обязательно должно быть правдой.

Кто-нибудь знает, что вызывает это? Спасибо!

 ---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-180-f8ab344c7939> in <module>
----> 1 pivot_ffill.fillna(0).head()

C:ProgramDataAnaconda3envspython36libsite-packagesdaskdataframecore.py in head(self, n, npartitions, compute)
    896 
    897         if compute:
--> 898             result = result.compute()
    899         return result
    900 

C:ProgramDataAnaconda3envspython36libsite-packagesdaskbase.py in compute(self, **kwargs)
    154         dask.base.compute
    155         """
--> 156         (result,) = compute(self, traverse=False, **kwargs)
    157         return result
    158 

C:ProgramDataAnaconda3envspython36libsite-packagesdaskbase.py in compute(*args, **kwargs)
    396     keys = [x.__dask_keys__() for x in collections]
    397     postcomputes = [x.__dask_postcompute__() for x in collections]
--> 398     results = schedule(dsk, keys, **kwargs)
    399     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    400 

C:ProgramDataAnaconda3envspython36libsite-packagesdaskthreaded.py in get(dsk, result, cache, num_workers, pool, **kwargs)
     74     results = get_async(pool.apply_async, len(pool._pool), dsk, result,
     75                         cache=cache, get_id=_thread_get_id,
---> 76                         pack_exception=pack_exception, **kwargs)
     77 
     78     # Cleanup pools associated to dead threads

C:ProgramDataAnaconda3envspython36libsite-packagesdasklocal.py in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs)
    460                         _execute_task(task, data)  # Re-execute locally
    461                     else:
--> 462                         raise_exception(exc, tb)
    463                 res, worker_id = loads(res_info)
    464                 state['cache'][key] = res

C:ProgramDataAnaconda3envspython36libsite-packagesdaskcompatibility.py in reraise(exc, tb)
    110         if exc.__traceback__ is not tb:
    111             raise exc.with_traceback(tb)
--> 112         raise exc
    113 
    114     import pickle as cPickle

C:ProgramDataAnaconda3envspython36libsite-packagesdasklocal.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    228     try:
    229         task, data = loads(task_info)
--> 230         result = _execute_task(task, data)
    231         id = get_id()
    232         result = dumps((result, id))

C:ProgramDataAnaconda3envspython36libsite-packagesdaskcore.py in _execute_task(arg, cache, dsk)
    116     elif istask(arg):
    117         func, args = arg[0], arg[1:]
--> 118         args2 = [_execute_task(a, cache) for a in args]
    119         return func(*args2)
    120     elif not ishashable(arg):

C:ProgramDataAnaconda3envspython36libsite-packagesdaskcore.py in <listcomp>(.0)
    116     elif istask(arg):
    117         func, args = arg[0], arg[1:]
--> 118         args2 = [_execute_task(a, cache) for a in args]
    119         return func(*args2)
    120     elif not ishashable(arg):

C:ProgramDataAnaconda3envspython36libsite-packagesdaskcore.py in _execute_task(arg, cache, dsk)
    117         func, args = arg[0], arg[1:]
    118         args2 = [_execute_task(a, cache) for a in args]
--> 119         return func(*args2)
    120     elif not ishashable(arg):
    121         return arg

C:ProgramDataAnaconda3envspython36libsite-packagesdaskoptimization.py in __call__(self, *args)
    940                              % (len(self.inkeys), len(args)))
    941         return core.get(self.dsk, self.outkey,
--> 942                         dict(zip(self.inkeys, args)))
    943 
    944     def __reduce__(self):

C:ProgramDataAnaconda3envspython36libsite-packagesdaskcore.py in get(dsk, out, cache)
    147     for key in toposort(dsk):
    148         task = dsk[key]
--> 149         result = _execute_task(task, cache)
    150         cache[key] = result
    151     result = _execute_task(out, cache)

C:ProgramDataAnaconda3envspython36libsite-packagesdaskcore.py in _execute_task(arg, cache, dsk)
    117         func, args = arg[0], arg[1:]
    118         args2 = [_execute_task(a, cache) for a in args]
--> 119         return func(*args2)
    120     elif not ishashable(arg):
    121         return arg

C:ProgramDataAnaconda3envspython36libsite-packagesdaskcompatibility.py in apply(func, args, kwargs)
     91     def apply(func, args, kwargs=None):
     92         if kwargs:
---> 93             return func(*args, **kwargs)
     94         else:
     95             return func(*args)

C:ProgramDataAnaconda3envspython36libsite-packagesdaskdataframecore.py in apply_and_enforce(*args, **kwargs)
   3800             if not np.array_equal(np.nan_to_num(meta.columns),
   3801                                   np.nan_to_num(df.columns)):
-> 3802                 raise ValueError("The columns in the computed data do not match"
   3803                                  " the columns in the provided metadata")
   3804             else:

ValueError: The columns in the computed data do not match the columns in the provided metadata
  

Ответ №1:

Сообщение об ошибке должно было подсказать вам, как исправить ситуацию. Мы предполагаем, что вы загружаетесь из CSV (в вопросе не указано), поэтому, вероятно, в итоге вы получите строку типа

 df = dd.read_csv(..., dtype={...})
  

который инструктирует читателя pandas относительно dtypes, которые вы хотите применить, поскольку вы знаете больше информации, чем pandas. Это гарантирует, что все разделы будут иметь одинаковые типы для всех столбцов — см. раздел Примечания к документам.

Комментарии:

1. На самом деле я загружал фрейм данных pandas в dask с помощью from_pandas . У этого метода нет dtype аргумента. Есть ли способ обойти сохранение в csv и перезагрузку?

2. Запись и загрузка в CSV, похоже, теперь работает. Еще раз спасибо!