#python-3.x #batch-processing #parquet #pyarrow
#python-3.x #пакетная обработка #паркет #pyarrow
Вопрос:
Основной вопрос: При обработке пакета данных за пакет, как обрабатывать изменение схемы в pyarrow?
Длинная история:
В качестве примера у меня есть следующие данные
| col_a | col_b |
-----------------
| 10 | 42 |
| 41 | 21 |
| 'foo' | 11 |
| 'bar' | 99 |
Я работаю с python 3.7 и использую pandas 1.1.0.
>>> import pandas as pd
>>> df = pd.read_csv('test.csv')
>>> df
col_a col_b
0 10 42
1 41 21
2 foo 11
3 bar 99
>>> df.dtypes
col_a object
col_b int64
dtype: object
>>>
Мне нужно начать работать с Apache Arrow, используя реализацию PyArrow 1.0.1. В моем приложении мы работаем пакет за пакетом. Это означает, что мы видим часть данных, следовательно, часть типов данных.
>>> dfi = pd.read_csv('test.csv', iterator=True, chunksize=2)
>>> dfi
<pandas.io.parsers.TextFileReader object at 0x7fabae915c50>
>>> dfg = next(dfi)
>>> dfg
col_a col_b
0 10 42
1 41 21
>>> sub_1 = next(dfi)
>>> sub_2 = next(dfi)
>>> sub_1
col_a col_b
2 foo 11
3 bar 99
>>> dfg2
col_a col_b
2 foo 11
3 bar 99
>>> sub_1.dtypes
col_a int64
col_b int64
dtype: object
>>> sub_2.dtypes
col_a object
col_b int64
dtype: object
>>>
Моя цель — сохранить весь этот фрейм данных, используя формат parquet Apache Arrow в ограничении рабочего пакета для каждого пакета. Это требует от нас правильного заполнения схемы. Как обрабатывать dtypes, которые меняются по пакетам?
Вот полный код для воспроизведения проблемы с использованием приведенных выше данных.
from pyarrow import RecordBatch, RecordBatchFileWriter, RecordBatchFileReader
import pandas as pd
pd.DataFrame([['10', 42], ['41', 21], ['foo', 11], ['bar', 99]], columns=['col_a', 'col_b']).to_csv('test.csv')
dfi = pd.read_csv('test.csv', iterator=True, chunksize=2)
sub_1 = next(dfi)
sub_2 = next(dfi)
# No schema provided here. Pyarrow should infer the schema from data. The first column is identified as a col of int.
batch_to_write_1 = RecordBatch.from_pandas(sub_1)
schema = batch_to_write_1.schema
writer = RecordBatchFileWriter('test.parquet', schema)
writer.write(batch_to_write_1)
# We expect to keep the same schema but that is not true, the schema does not match sub_2 data. So the
# following line launch an exception.
batch_to_write_2 = RecordBatch.from_pandas(sub_2, schema)
# writer.write(batch_to_write_2) # This will fail bcs batch_to_write_2 is not defined
Мы получаем следующее исключение
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "pyarrow/table.pxi", line 858, in pyarrow.lib.RecordBatch.from_pandas
File "/mnt/e/miniconda/envs/pandas/lib/python3.7/site-packages/pyarrow/pandas_compat.py", line 579, in dataframe_to_arrays
for c, f in zip(columns_to_convert, convert_fields)]
File "/mnt/e/miniconda/envs/pandas/lib/python3.7/site-packages/pyarrow/pandas_compat.py", line 579, in <listcomp>
for c, f in zip(columns_to_convert, convert_fields)]
File "/mnt/e/miniconda/envs/pandas/lib/python3.7/site-packages/pyarrow/pandas_compat.py", line 559, in convert_column
result = pa.array(col, type=type_, from_pandas=True, safe=safe)
File "pyarrow/array.pxi", line 265, in pyarrow.lib.array
File "pyarrow/array.pxi", line 80, in pyarrow.lib._ndarray_to_array
TypeError: an integer is required (got type str)
Ответ №1:
Такое поведение предназначено. Попробуйте несколько альтернатив (я считаю, что они должны работать, но я не тестировал их все):
- Если вы заранее знаете окончательную схему, создайте ее вручную в pyarrow вместо того, чтобы полагаться на выведенную из первого пакета записей.
- Просмотрите все данные и вычислите окончательную схему. Затем повторно обработайте данные с помощью новой схемы.
- Обнаружьте изменение схемы и переделайте предыдущие пакеты записей.
- Обнаружьте изменение схемы и запустите новую таблицу (тогда у вас будет один файл parquet для каждой схемы, и вам понадобится другой процесс для объединения схем).
Наконец, если это работает, и вы пытаетесь преобразовать данные CSV, вы можете рассмотреть возможность использования встроенного анализатора Arrow CSV.