#parquet #pyarrow
Вопрос:
При записи двух файлов parquet локально в набор данных arrow может соответствующим образом добавлять их в разделы. Например, если я разделю два файла с помощью стрелки по столбцу A, стрелка создаст файловую структуру с вложенными папками, соответствующими каждому уникальному значению в столбце A, когда я напишу первый файл parquet с разделением. И когда записывается второй файл, стрелка достаточно умна, чтобы записать данные в нужный раздел. Поэтому, если файл один и два имеют общие значения в столбце А, я вижу два отдельных файла во вложенной папке с общим значением. Пример кода :
df = pd.read_parquet('~/Desktop/rough/parquet_experiment/actual_07.parquet')
table = pa.Table.from_pandas(df)
pq.write_to_dataset(table, str(base "parquet_dataset_partition_combined"),
partition_cols=['PartitionPoint'])
df = pd.read_parquet('~/Desktop/rough/parquet_experiment/actual_08.parquet')
table = pa.Table.from_pandas(df)
pq.write_to_dataset(table, str(base "parquet_dataset_partition_combined"),
partition_cols=['PartitionPoint'])
это приводит к :
где две папки, созданные из-за столбца разделов, имеют количество элементов два [A и B], а подпапка PartitionPart=A содержит два файла, потому что оба файла actual_07 и actual_08 могут внести свой вклад в раздел ParitionPart=A
Однако этого не происходит, когда я использую точно такой же код, но использую S3 в качестве файловой системы. Код для этого выглядит следующим образом :
from pyarrow import fs
s3 = fs.S3FileSystem(region="us-east-2")
df = pd.read_parquet('~/Desktop/rough/parquet_experiment/actual_07.parquet')
table = pa.Table.from_pandas(df)
pq.write_to_dataset(table, "parquet-storage",
partition_cols=['PartitionPoint'],
filesystem=s3)
df = pd.read_parquet('~/Desktop/rough/parquet_experiment/actual_08.parquet')
table = pa.Table.from_pandas(df)
pq.write_to_dataset(table, "parquet-storage",
partition_cols=['PartitionPoint'],
filesystem=s3)
Вместо этого я обнаружил, что вторая инструкция write перезаписывает данные в S3. Каждая часть раздела=папка всегда содержит только один файл за раз. Является ли это предостережением об использовании S3 в качестве моей файловой системы ?
Ответ №1:
Изменение здесь заключается в том, что вы неявно переключаетесь с прежнего модуля записи набора данных на новый модуль записи набора данных. pq.write_to_dataset
по умолчанию будет использоваться устаревшее поведение. Однако, если файловая система предоставлена (устаревшее поведение не поддерживает это), то она будет использовать новое поведение:
if use_legacy_dataset is None:
# if a new filesystem is passed -> default to new implementation
if isinstance(filesystem, FileSystem):
use_legacy_dataset = False
# otherwise the default is still True
else:
use_legacy_dataset = True
По умолчанию прежний автор по умолчанию называл файлы с помощью идентификатора GUID, и поэтому, если вы сделаете две записи (каждая запись содержит данные для каждой папки), вы получите два файла в каждой папке. Файлы имен beahvior нового автора по умолчанию с использованием счетчика (например part-{i}.extension
). Это означает, что несколько операций записи потенциально перезапишут существующие файлы (так как счетчик сбрасывается при каждом вызове write_to_dataset
)
Чтобы получить такое поведение с помощью более нового средства записи набора pyarrow.dataset.write_dataset
данных . Вам нужно будет использовать basename_template
аргумент и создавать новый basename_template
для каждой записи (простой способ сделать это-добавить uuid в шаблон). Например:
ds.write_dataset(table, '/tmp/mydataset', filesystem=s3,
partitioning=partitioning, basename_template=str(uuid.uuid4()) '-{i}',
format='parquet')
Несколько вещей, на которые следует обратить внимание при переходе на новый формат:
format='parquet'
— Новый писатель поддерживает запись нескольких форматов файлов, поэтому вам нужно указать parquet.partitioning=partitioning
— Новый писатель имеет более гибкий формат для указания схемы разделения. Чтобы получить старое поведение, вам понадобится привкус разделения в стиле улья:
import pyarrow.dataset as ds
# Note, you have to supply a schema here and not just a list of columns.
# However, this is hopefully changing in part of 6.0 so you can take
# an approach similar to the old style of just specifying column
# names (ARROW-13755).
partitioning = ds.partitioning(schema=pa.schema([pa.field('PartitioningPoint', type=pa.string())]))
Комментарии:
1. спасибо, Пейс, за ответ и объяснение, хотя я понимаю, как работает эта настройка, похоже, что аргумент partition_filename_cb больше не поддерживается pyarrow
2. @AbhishekMalik Не могли бы вы помочь мне понять, что вы под этим подразумеваете? Я почти уверен, что и старый API, и более новый API должны поддерживать его, и я действительно запустил фрагмент, который я предоставил, с образцами данных. Вы получили какую-то ошибку?
3. да, к сожалению, я получил следующую ошибку : Ошибка ValueError: аргумент ‘partition_filename_cb’ не поддерживается реализацией нового набора данных. Моя версия pyarrow-4.0.1
4. и вот код, который я запускаю : pq.write_to_dataset(таблица, «паркетное хранилище», partition_cols=[‘Точка раздела’], файловая система=s3, имя файла раздела _cb=лямбда _: str(uuid.uuid4()))
5. Ах…Я вижу, где я допустил ошибку в своем тестировании. Я считаю, что это означает, что вам нужно будет использовать новые методы API. Я обновлю свой пример.