Запись в разделенную таблицу улья занимает больше времени по мере роста таблицы

#apache-spark #hadoop #hive #hdfs #parquet

#apache-spark #hadoop #улей #hdfs #паркет

Вопрос:

Я использую Spark 2.4.4 для записи в 2-уровневую разделенную внешнюю таблицу улья (формат parquet на HDFS):

 CREATE EXTERNAL TABLE mytable (<SCHEMA>)
PARTITIONED BY (`field1` STRING, `field2` STRING)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
WITH SERDEPROPERTIES (
  'serialization.format' = '1'
)
STORED AS
  INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
  OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION 'hdfs://nameservice1/user/....
 

Схема довольно сложная (много вложенных массивов и структур). Поскольку я вставляю в эту таблицу:

 df.write.mode("overwrite").insertInto(myTable)
 

Время, затрачиваемое на ввод-вывод, увеличивается с каждым заданием. На задание (пакет данных) Я пишу в 5-10 разных field2 разделов (которые пусты перед заданием). Так что я на самом деле только добавляю данные

Начиная с пустой таблицы, запись пакета данных занимает несколько секунд (около ГБ данных), теперь время увеличилось до 30 минут (SparkUI показывает, что все задания завершены, поэтому я предполагаю, что это ввод-вывод, который блокирует выполнение приложения spark). За это время не было записано абсолютно никаких журналов, ни для исполнителей, ни для драйвера.

Я предполагаю, что spark сканирует все существующие разделы для каждого действия перезаписи… но я не уверен.

Я установил hive.exec.dynamic.partition=true , и spark.sql.sources.partitionOverwriteMode=dynamic . Остальная часть конфигурации используется по умолчанию.

Комментарии:

1. Вы перезаписываете таблицу для каждого задания? Если да, то какой смысл сохранять ваши данные?

2. возможно, это автоматический сбор статистики улья… Попробуйте установить hive.stats.autogather=false;

3. @leftjoin хорошая идея, но, к сожалению, не помогло

4. @mck нет, я не перезаписываю данные. Как я объяснил в вопросе, я пишу (добавляю) новые разделы с каждым заданием

5. @RaphaelRoth хм, тогда попробуй .mode("append") ?

Ответ №1:

Вы можете сохранить фрейм данных непосредственно в путь, по которому находятся ваши разделенные данные, и это тот же путь, который упоминается в CREATE TABLE инструкции Hive

 df.write.mode("overwrite").partitionBy("col_specified_for_partitioning").parquet("/path/mentioned/in/create/table")

spark.sql("MSCK REPAIR TABLE dbname.tablename")
 

Это должно решить проблему, когда вы хотите удалить и заново создать данные для определенного раздела, и MSCK REPAIR TABLE просто делает таблицу осведомленной о разделах в пути HDFS.

Ответ №2:

Попробуйте

 spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
data.write.mode(SaveMode.Overwrite).insertInto("table")
 

Вы также можете попробовать способ @yayati-sule, упомянутый выше, для записи данных, т.Е. Указать целевой каталог напрямую, как показано ниже,

 spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
df.write.mode(SaveMode.Overwrite).format("parquet").partitionBy("field1", "field2").save("hdfs://nameservice1/user/raw/table/<YYYYMMDDHHMMSS>")
 

Также вы можете попробовать установить session conf,

 sparkSession.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
 

Или, если это тоже не удается, попробуйте старомодный способ, а затем сделайте alter table add partition .

 df.write.mode(SaveMode.Overwrite).save("hdfs://nameservice1/user/raw/table/field1=val1/field2=val2/")
 

Любой, кто использует Pre Hadoop-3.3 и S3, используя Hadoop_S3A_client, позже внесет некоторые улучшения в производительность. Итак, обновите.