PyArrow: чтение одного файла из секционированного набора данных parquet неожиданно медленно

#python #pandas #parquet #pyarrow

#python #pandas #parquet #pyarrow

Вопрос:

У меня возникли некоторые проблемы со скоростью загрузки .parquet файлов. Однако я не знаю, что я делаю не так.

Проблема

Я пытаюсь прочитать один .parquet файл из моей локальной файловой системы, который является разделенным выводом из задания spark. Таким образом , чтобы в .parquet иерархических каталогах были файлы с именем a=x и b=y .

Чтобы достичь этого, я использую pandas.read_parquet (which uses pyarrow.parquet.read_table ), для которого я включаю filters kwarg. Время выполнения использования filters намного больше, чем я ожидал бы.

 # The following runs for about 55 seconds
pd.read_parquet(<path_to_entire_dataset>, filters=[[('a', '=', 'x'), ('b', '=', 'y')]])

# The following runs for about 0.04 seconds
pd.read_parquet(<path_to_entire_dataset>/a=x/b=y/)

# The following runs for about 70 seconds
pd.read_parquet(<path_to_entire_dataset>)
 

Чтение одного файла parquet с указанием фильтров происходит лишь немного быстрее, чем загрузка всего набора данных, где я ожидал бы, что время выполнения будет примерно линейным по количеству файлов.

Какую ошибку я здесь совершаю?

Я понимаю, что простое размещение фильтров в пути будет работать, однако это быстро станет сложным, поскольку то, что я хочу отфильтровать, будет / может измениться. Кроме того, я думаю read_table , что должен иметь возможность эффективно загружать эти данные.

PS: Весь набор данных содержит много миллионов строк, данные, которые я хочу загрузить, составляют всего несколько тысяч строк.

Правка 1:

Как было предложено 0x26res, я вручную определил разделение, это привело к значительному ускорению, но все же не так сильно, как я ожидал. В этой ситуации время выполнения составляло около 5 секунд.

 partitioning = HivePartitioning(
    pa.schema([
        pa.field('a', pa.string()),
        pa.field('b', pa.int32()),
    ])
)

pd.read_parquet(
    <path_to_entire_dataset>,
    engine='pyarrow',
    filters=[
        [
            ('a', '=', x),
            ('b', '=', y),
        ]
    ],
    partitioning=partitioning
)
 

Комментарии:

1. Я сделал более ранний комментарий, но, думаю, я неправильно истолковал ваш вопрос. Ваш набор данных разделен spark, поэтому имена каталогов форматируются как a=x и b=y ?

2. Да, на всякий случай, если было неясно, я обновил описание.

3. Можете ли вы помочь мне немного лучше понять производительность? Сколько файлов соответствовало этому фильтру? Насколько велики были эти файлы? Насколько большой была несжатая таблица, прочитанная этими файлами?

4. Существует ровно один файл, который соответствует этому фильтру. Существует около 100_000 файлов, каждый размером до 100 КБ. Несжатая таблица составляла около 10 ГБ. Как я уже говорил в своем вопросе, я мог бы, конечно, указать нужные мне файлы в path и, возможно, загрузить несколько нужных мне файлов и объединить их с помощью pd.concat . Однако я хочу понять, почему это происходит не так быстро, как я ожидаю. Я думал, что это одно из основных преимуществ parquet: вы загружаете нужные вам данные, а не больше.

5. есть ли какие-либо обновления или решения по этой проблеме, которыми вы можете поделиться? Был бы очень признателен!

Ответ №1:

Учитывая время выполнения, я подозреваю, что arrow открывает все файлы, а затем фильтрует.

Может быть, вы можете попробовать указать разделение, чтобы стрелка могла быть умнее в этом:

 import pyarrow as pa

partitioning = pa.dataset.HivePartitioning(
        pa.schema([
            pa.field('a', pa.string()),
            pa.field('b', pa.string())
        ])
    )

pd.read_parquet(<path_to_entire_dataset>, filters=[[('a', '=', 'x'), ('b', '=', 'y')]], partitioning=partitioning)
 

Комментарии:

1. Я не знал, что мне нужно будет определить разделение таким образом. Когда я это сделал, это привело к значительному ускорению, чтение файла теперь занимает всего 5 секунд. Это все еще не так быстро, как я ожидал. Я буду обновлять свой вопрос, однако пока не буду проверять ваш ответ.

2. Он должен быть обнаружен автоматически. pd.read_parquet вызовы pyarrow.parquet.read_table и поведение секционирования по умолчанию должны заключаться в обнаружении разделов в стиле улья (т. Е. Тех, Которые у вас есть). Тот факт, что вы должны указать это, означает, что обнаружение не выполняется. Если бы вы могли создать воспроизводимый пример и отправить его в Arrow JIRA , это было бы полезно.