Фрейм данных Dask из файлов parquet: ошибка OS: не удалось десериализовать thrift: Исключение TProtocolException: недопустимые данные

#dask #parquet #pyarrow #dask-dataframe

#dask #parquet #pyarrow #dask-фрейм данных

Вопрос:

Я создаю фрейм данных Dask для последующего использования в алгоритме кластеризации, предоставляемом dask-ml. На предыдущем шаге моего конвейера я прочитал фрейм данных с диска, используя dask.dataframe.read_parquet , применил преобразование для добавления столбцов, используя map_partitions , а затем записал полученный фрейм данных обратно на диск, используя dask.dataframe.to_parquet . Проблема возникает, когда результирующий фрейм данных снова считывается и compute() вызывается.

Запуск следующего кода:

 # First step: make the Dask dataframe
ddf = ddf.map_partitions(partition_func)  # in this case, perform a df.apply, then pandas.concat with the original
ddf_output_path = pathlib.Path("./data/")  # Some directory
ddf_output_path.mkdir(parents=True, exist_ok=True)
dask.dataframe.to_parquet(ddf, ddf_output_path)  # Succeeds

# Second step: attempt to read and compute on the Dask dataframe
ddf = dask.dataframe.read_parquet(ddf_output_path)
print(ddf.columns)  # Produces the correct output
print(ddf.shape[0].compute())  # <-- fails here, for example
 

Выдает следующую трассировку:

  File "/home/ec2-user/pycharm_remote/pipeline/perform_clustering.py", line 32, in run
    print(ddf.shape[0].compute())
  File "/home/ec2-user/project/virtualenv/lib64/python3.7/site-packages/dask/base.py", line 167, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/home/ec2-user/project/virtualenv/lib64/python3.7/site-packages/dask/base.py", line 452, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/home/ec2-user/project/virtualenv/lib64/python3.7/site-packages/dask/threaded.py", line 84, in get
    **kwargs
  File "/home/ec2-user/project/virtualenv/lib64/python3.7/site-packages/dask/local.py", line 486, in get_async
    raise_exception(exc, tb)
  File "/home/ec2-user/project/virtualenv/lib64/python3.7/site-packages/dask/local.py", line 316, in reraise
    raise exc
  File "/home/ec2-user/project/virtualenv/lib64/python3.7/site-packages/dask/local.py", line 222, in execute_task
    result = _execute_task(task, data)
  File "/home/ec2-user/project/virtualenv/lib64/python3.7/site-packages/dask/core.py", line 121, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "/home/ec2-user/project/virtualenv/lib64/python3.7/site-packages/dask/dataframe/io/parquet/core.py", line 276, in read_parquet_part
    dfs = [func(fs, rg, columns.copy(), index, **kwargs) for rg in part]
  File "/home/ec2-user/project/virtualenv/lib64/python3.7/site-packages/dask/dataframe/io/parquet/core.py", line 276, in <listcomp>
    dfs = [func(fs, rg, columns.copy(), index, **kwargs) for rg in part]
  File "/home/ec2-user/project/virtualenv/lib64/python3.7/site-packages/dask/dataframe/io/parquet/arrow.py", line 758, in read_partition
    piece, columns, partitions, **kwargs
  File "/home/ec2-user/project/virtualenv/lib64/python3.7/site-packages/dask/dataframe/io/parquet/arrow.py", line 817, in _parquet_piece_as_arrow
    **kwargs.get("read", {}),
  File "/home/ec2-user/project/virtualenv/lib64/python3.7/site-packages/pyarrow/parquet.py", line 719, in read
    table = reader.read_row_group(self.row_group, **options)
  File "/home/ec2-user/project/virtualenv/lib64/python3.7/site-packages/pyarrow/parquet.py", line 272, in read_row_group
    use_threads=use_threads)
  File "pyarrow/_parquet.pyx", line 1080, in pyarrow._parquet.ParquetReader.read_row_group
  File "pyarrow/_parquet.pyx", line 1099, in pyarrow._parquet.ParquetReader.read_row_groups
  File "pyarrow/error.pxi", line 99, in pyarrow.lib.check_status
OSError: Couldn't deserialize thrift: TProtocolException: Invalid data
Deserializing page header failed.
 

Среда — Amazon Linux 2, Python 3.7.9, с dask == 2.30.0, pyarrow == 2.0.0, pandas == 1.1.5, numpy == 1.19.4. Фрейм данных dask состоит из 404 столбцов и считывается примерно из 14 000 файлов parquet (разделов). Четыре столбца содержат элементы типа object (три содержат строки, один содержит вложенный список строк), в то время как остальные 400 содержат тип float64 .

Комментарии:

1. Одним из источников проблемы может быть то, что фреймы данных pandas, сгенерированные вызовом map_partitions, имели имена столбцов типа, отличного от string (целые числа от 0 до 399). Я получил это предупреждение от pyarrow: The DataFrame has column names of mixed type. They will be converted to strings and not roundtrip correctly.

2. Исправление имен столбцов не решило проблему.

3. Возможно, вы захотите попробовать с pyarrow < 2 на данный момент

4. Если вы вручную прочитали один файл, например, с помощью pandas ( pd.read_parquet("path/to/single/file.parquet") . Выдает ли это ту же ошибку?