Pyarrow перезаписывает набор данных при использовании файловой системы S3

#parquet #pyarrow

Вопрос:

При записи двух файлов parquet локально в набор данных arrow может соответствующим образом добавлять их в разделы. Например, если я разделю два файла с помощью стрелки по столбцу A, стрелка создаст файловую структуру с вложенными папками, соответствующими каждому уникальному значению в столбце A, когда я напишу первый файл parquet с разделением. И когда записывается второй файл, стрелка достаточно умна, чтобы записать данные в нужный раздел. Поэтому, если файл один и два имеют общие значения в столбце А, я вижу два отдельных файла во вложенной папке с общим значением. Пример кода :

 df = pd.read_parquet('~/Desktop/rough/parquet_experiment/actual_07.parquet')
table = pa.Table.from_pandas(df)
pq.write_to_dataset(table, str(base    "parquet_dataset_partition_combined"), 
                    partition_cols=['PartitionPoint'])

df = pd.read_parquet('~/Desktop/rough/parquet_experiment/actual_08.parquet')
table = pa.Table.from_pandas(df)
pq.write_to_dataset(table, str(base    "parquet_dataset_partition_combined"), 
                    partition_cols=['PartitionPoint'])
 

это приводит к :

введите описание изображения здесь

введите описание изображения здесь

где две папки, созданные из-за столбца разделов, имеют количество элементов два [A и B], а подпапка PartitionPart=A содержит два файла, потому что оба файла actual_07 и actual_08 могут внести свой вклад в раздел ParitionPart=A

Однако этого не происходит, когда я использую точно такой же код, но использую S3 в качестве файловой системы. Код для этого выглядит следующим образом :

 from pyarrow import fs

s3  = fs.S3FileSystem(region="us-east-2")


df = pd.read_parquet('~/Desktop/rough/parquet_experiment/actual_07.parquet')
table = pa.Table.from_pandas(df)
pq.write_to_dataset(table, "parquet-storage", 
                    partition_cols=['PartitionPoint'],
                    filesystem=s3)

df = pd.read_parquet('~/Desktop/rough/parquet_experiment/actual_08.parquet')
table = pa.Table.from_pandas(df)
pq.write_to_dataset(table, "parquet-storage", 
                    partition_cols=['PartitionPoint'],
                   filesystem=s3)
 

Вместо этого я обнаружил, что вторая инструкция write перезаписывает данные в S3. Каждая часть раздела=папка всегда содержит только один файл за раз. Является ли это предостережением об использовании S3 в качестве моей файловой системы ?

Ответ №1:

Изменение здесь заключается в том, что вы неявно переключаетесь с прежнего модуля записи набора данных на новый модуль записи набора данных. pq.write_to_dataset по умолчанию будет использоваться устаревшее поведение. Однако, если файловая система предоставлена (устаревшее поведение не поддерживает это), то она будет использовать новое поведение:

     if use_legacy_dataset is None:
        # if a new filesystem is passed -> default to new implementation
        if isinstance(filesystem, FileSystem):
            use_legacy_dataset = False
        # otherwise the default is still True
        else:
            use_legacy_dataset = True
 

По умолчанию прежний автор по умолчанию называл файлы с помощью идентификатора GUID, и поэтому, если вы сделаете две записи (каждая запись содержит данные для каждой папки), вы получите два файла в каждой папке. Файлы имен beahvior нового автора по умолчанию с использованием счетчика (например part-{i}.extension ). Это означает, что несколько операций записи потенциально перезапишут существующие файлы (так как счетчик сбрасывается при каждом вызове write_to_dataset )

Чтобы получить такое поведение с помощью более нового средства записи набора pyarrow.dataset.write_dataset данных . Вам нужно будет использовать basename_template аргумент и создавать новый basename_template для каждой записи (простой способ сделать это-добавить uuid в шаблон). Например:

 ds.write_dataset(table, '/tmp/mydataset', filesystem=s3,
  partitioning=partitioning, basename_template=str(uuid.uuid4())   '-{i}',
  format='parquet')
 

Несколько вещей, на которые следует обратить внимание при переходе на новый формат:

  • format='parquet' — Новый писатель поддерживает запись нескольких форматов файлов, поэтому вам нужно указать parquet.
  • partitioning=partitioning — Новый писатель имеет более гибкий формат для указания схемы разделения. Чтобы получить старое поведение, вам понадобится привкус разделения в стиле улья:
 import pyarrow.dataset as ds
# Note, you have to supply a schema here and not just a list of columns.
# However, this is hopefully changing in part of 6.0 so you can take
# an approach similar to the old style of just specifying column
# names (ARROW-13755).
partitioning = ds.partitioning(schema=pa.schema([pa.field('PartitioningPoint', type=pa.string())]))
 

Комментарии:

1. спасибо, Пейс, за ответ и объяснение, хотя я понимаю, как работает эта настройка, похоже, что аргумент partition_filename_cb больше не поддерживается pyarrow

2. @AbhishekMalik Не могли бы вы помочь мне понять, что вы под этим подразумеваете? Я почти уверен, что и старый API, и более новый API должны поддерживать его, и я действительно запустил фрагмент, который я предоставил, с образцами данных. Вы получили какую-то ошибку?

3. да, к сожалению, я получил следующую ошибку : Ошибка ValueError: аргумент ‘partition_filename_cb’ не поддерживается реализацией нового набора данных. Моя версия pyarrow-4.0.1

4. и вот код, который я запускаю : pq.write_to_dataset(таблица, «паркетное хранилище», partition_cols=[‘Точка раздела’], файловая система=s3, имя файла раздела _cb=лямбда _: str(uuid.uuid4()))

5. Ах…Я вижу, где я допустил ошибку в своем тестировании. Я считаю, что это означает, что вам нужно будет использовать новые методы API. Я обновлю свой пример.