Как повысить производительность формата файла csv в parquet с помощью pyspark?

#performance #csv #pyspark #parquet

#Производительность #csv #pyspark #паркет

Вопрос:

У меня есть большой набор данных, который мне нужно преобразовать из csv в формат parquet с помощью pyspark. Существует около 500 ГБ данных, разбросанных по тысячам файлов csv. Моя первоначальная реализация упрощена…

 spark = SparkSession.builder 
    .master("local") 
    .appName("test") 
    .getOrCreate()

df = spark.read.csv(input_files, header=True, inferSchema=True)

df.repartition(1).write.mode('overwrite').parquet(output_dir)
 

Производительность ужасна, я позволил ей работать более 2 часов, прежде чем сдаться. Из выходных данных журнала я делаю вывод, что он даже не завершает чтение файлов csv в фрейм данных.

Я запускаю spark локально на сервере с 128 высокопроизводительными процессорными ядрами и 1 ТБ памяти. Дисковое хранилище основано на SSD с подтвержденной скоростью чтения 650 МБ / с. Моя интуиция заключается в том, что я должен быть в состоянии значительно повысить производительность, учитывая доступные вычислительные ресурсы. Я ищу советы о том, как это сделать.

Я пробовал…

  • не выводя схему, это не привело к заметной разнице в производительности (схема состоит из четырех столбцов текста)
  • используя настройку конфигурации spark.executor.cores , чтобы соответствовать количеству физических ядер на моем сервере. Настройка, похоже, не оказала никакого эффекта, я не наблюдал, чтобы система использовала больше ядер.

На данный момент я застрял в использовании pyspark в соответствии с руководством, но при необходимости я могу убедить их использовать другой инструмент.

Комментарии:

1. Вы уверены, что вам нужен файл Parquet без каких-либо разделов? Наличие секционированных столбцов должно дать некоторое ускорение не только при записи, но и при чтении и выполнении вычислений в будущем.

2. @mck Я немного подумал, каков наилучший способ определить оптимальное количество разделов?

3. Это очень широкий вопрос — Google даст вам много ответов. Это действительно зависит от вашей структуры данных.

4. также предоставьте схему — либо вручную, либо прочитайте один файл CSV по имени и прочитайте его с inferSchema помощью, а затем используйте схему из этого файла для чтения всех файлов

Ответ №1:

Некоторые предложения, основанные на моем опыте работы со spark :

  • Вы не должны выводить схему, если имеете дело с огромными данными. Это может не показать значительного улучшения производительности, но определенно сэкономит вам некоторое время.
  • Не используйте перераспределение (1), поскольку это приведет к перетасовке данных и созданию единого раздела с данными, а это то, чего вы не хотите с огромным объемом имеющихся у вас данных. Я бы посоветовал вам увеличить количество разделов, если это возможно, в зависимости от конфигурации кластера, которую вы имеете, чтобы файлы parquet сохранялись быстрее.
  • Не кэшируйте / сохраняйте свой фрейм данных, если вы просто читаете файлы csv, а затем на следующем шаге сохраняете его как файлы parquet. Это может увеличить ваше время экономии, поскольку само кэширование занимает некоторое время. Кэширование фрейма данных помогло бы, если бы вы выполняли несколько преобразований в фрейме данных, а затем выполняли с ним несколько действий. вы выполняете только одно действие по записи фрейма данных в виде файла parquet, поэтому, по моему мнению, вам не следует кэшировать фрейм данных.

Ответ №2:

Некоторые возможные улучшения :

  • Не используйте .repartition(1) , так как вы теряете параллелизм для операции записи
  • Сохраняйте / кэшируйте фрейм данных перед записью : df.persist()

Если вам действительно нужно сохранить его как 1 файл parquet, вы можете сначала записать во временную папку без уменьшения разделов, а затем использовать coalesce во второй операции записи :

 df = spark.read.csv(input_files, header=True, inferSchema=True).persist()
# ....

df.write.mode('overwrite').parquet("/temp/folder")
df.unpersist()

df1 = spark.read.parquet("/temp/folder")
df1.coalesce(1).write.mode('overwrite').parquet(output_dir)
 

Комментарии:

1. Хороший совет по перераспределению, в моем случае это не обязательно. Я видел это в примере и предположил, что это обязательно.