Несогласованная схема в apache arrow

#python-3.x #batch-processing #parquet #pyarrow

#python-3.x #пакетная обработка #паркет #pyarrow

Вопрос:

Основной вопрос: При обработке пакета данных за пакет, как обрабатывать изменение схемы в pyarrow?

Длинная история:

В качестве примера у меня есть следующие данные

 | col_a | col_b |
-----------------
|  10   |  42   |
|  41   |  21   |
| 'foo' |  11   |
| 'bar' |  99   |
  

Я работаю с python 3.7 и использую pandas 1.1.0.

 >>> import pandas as pd
>>> df = pd.read_csv('test.csv')
>>> df
  col_a  col_b
0    10     42
1    41     21
2   foo     11
3   bar     99
>>> df.dtypes
col_a    object
col_b     int64
dtype: object  
>>>
  

Мне нужно начать работать с Apache Arrow, используя реализацию PyArrow 1.0.1. В моем приложении мы работаем пакет за пакетом. Это означает, что мы видим часть данных, следовательно, часть типов данных.

 >>> dfi = pd.read_csv('test.csv', iterator=True, chunksize=2)
>>> dfi
<pandas.io.parsers.TextFileReader object at 0x7fabae915c50>
>>> dfg = next(dfi)
>>> dfg
   col_a  col_b
0     10     42
1     41     21
>>> sub_1 = next(dfi)
>>> sub_2 = next(dfi)
>>> sub_1
  col_a  col_b
2   foo     11
3   bar     99
>>> dfg2
  col_a  col_b
2   foo     11
3   bar     99
>>> sub_1.dtypes
col_a    int64
col_b    int64
dtype: object 
>>> sub_2.dtypes
col_a    object
col_b     int64
dtype: object  
>>>
  

Моя цель — сохранить весь этот фрейм данных, используя формат parquet Apache Arrow в ограничении рабочего пакета для каждого пакета. Это требует от нас правильного заполнения схемы. Как обрабатывать dtypes, которые меняются по пакетам?
Вот полный код для воспроизведения проблемы с использованием приведенных выше данных.

 from pyarrow import RecordBatch, RecordBatchFileWriter, RecordBatchFileReader
import pandas as pd

pd.DataFrame([['10', 42], ['41', 21], ['foo', 11], ['bar', 99]], columns=['col_a', 'col_b']).to_csv('test.csv')
dfi = pd.read_csv('test.csv', iterator=True, chunksize=2)
sub_1 = next(dfi)
sub_2 = next(dfi)

# No schema provided here. Pyarrow should infer the schema from data. The first column is identified as a col of int.
batch_to_write_1 = RecordBatch.from_pandas(sub_1)
schema = batch_to_write_1.schema
writer = RecordBatchFileWriter('test.parquet', schema)
writer.write(batch_to_write_1)

# We expect to keep the same schema but that is not true, the schema does not match sub_2 data. So the
# following line launch an exception.
batch_to_write_2 = RecordBatch.from_pandas(sub_2, schema)
# writer.write(batch_to_write_2)  # This will fail bcs batch_to_write_2 is not defined
  

Мы получаем следующее исключение

 Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "pyarrow/table.pxi", line 858, in pyarrow.lib.RecordBatch.from_pandas
  File "/mnt/e/miniconda/envs/pandas/lib/python3.7/site-packages/pyarrow/pandas_compat.py", line 579, in dataframe_to_arrays
    for c, f in zip(columns_to_convert, convert_fields)]
  File "/mnt/e/miniconda/envs/pandas/lib/python3.7/site-packages/pyarrow/pandas_compat.py", line 579, in <listcomp>
    for c, f in zip(columns_to_convert, convert_fields)]
  File "/mnt/e/miniconda/envs/pandas/lib/python3.7/site-packages/pyarrow/pandas_compat.py", line 559, in convert_column     
    result = pa.array(col, type=type_, from_pandas=True, safe=safe)
  File "pyarrow/array.pxi", line 265, in pyarrow.lib.array
  File "pyarrow/array.pxi", line 80, in pyarrow.lib._ndarray_to_array
TypeError: an integer is required (got type str)
  

Ответ №1:

Такое поведение предназначено. Попробуйте несколько альтернатив (я считаю, что они должны работать, но я не тестировал их все):

  1. Если вы заранее знаете окончательную схему, создайте ее вручную в pyarrow вместо того, чтобы полагаться на выведенную из первого пакета записей.
  2. Просмотрите все данные и вычислите окончательную схему. Затем повторно обработайте данные с помощью новой схемы.
  3. Обнаружьте изменение схемы и переделайте предыдущие пакеты записей.
  4. Обнаружьте изменение схемы и запустите новую таблицу (тогда у вас будет один файл parquet для каждой схемы, и вам понадобится другой процесс для объединения схем).

Наконец, если это работает, и вы пытаетесь преобразовать данные CSV, вы можете рассмотреть возможность использования встроенного анализатора Arrow CSV.