#apache-spark #hadoop #hive #hdfs #parquet
#apache-spark #hadoop #улей #hdfs #паркет
Вопрос:
Я использую Spark 2.4.4 для записи в 2-уровневую разделенную внешнюю таблицу улья (формат parquet на HDFS):
CREATE EXTERNAL TABLE mytable (<SCHEMA>)
PARTITIONED BY (`field1` STRING, `field2` STRING)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
WITH SERDEPROPERTIES (
'serialization.format' = '1'
)
STORED AS
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION 'hdfs://nameservice1/user/....
Схема довольно сложная (много вложенных массивов и структур). Поскольку я вставляю в эту таблицу:
df.write.mode("overwrite").insertInto(myTable)
Время, затрачиваемое на ввод-вывод, увеличивается с каждым заданием. На задание (пакет данных) Я пишу в 5-10 разных field2
разделов (которые пусты перед заданием). Так что я на самом деле только добавляю данные
Начиная с пустой таблицы, запись пакета данных занимает несколько секунд (около ГБ данных), теперь время увеличилось до 30 минут (SparkUI показывает, что все задания завершены, поэтому я предполагаю, что это ввод-вывод, который блокирует выполнение приложения spark). За это время не было записано абсолютно никаких журналов, ни для исполнителей, ни для драйвера.
Я предполагаю, что spark сканирует все существующие разделы для каждого действия перезаписи… но я не уверен.
Я установил hive.exec.dynamic.partition=true
, и spark.sql.sources.partitionOverwriteMode=dynamic
. Остальная часть конфигурации используется по умолчанию.
Комментарии:
1. Вы перезаписываете таблицу для каждого задания? Если да, то какой смысл сохранять ваши данные?
2. возможно, это автоматический сбор статистики улья… Попробуйте установить hive.stats.autogather=false;
3. @leftjoin хорошая идея, но, к сожалению, не помогло
4. @mck нет, я не перезаписываю данные. Как я объяснил в вопросе, я пишу (добавляю) новые разделы с каждым заданием
5. @RaphaelRoth хм, тогда попробуй
.mode("append")
?
Ответ №1:
Вы можете сохранить фрейм данных непосредственно в путь, по которому находятся ваши разделенные данные, и это тот же путь, который упоминается в CREATE TABLE
инструкции Hive
df.write.mode("overwrite").partitionBy("col_specified_for_partitioning").parquet("/path/mentioned/in/create/table")
spark.sql("MSCK REPAIR TABLE dbname.tablename")
Это должно решить проблему, когда вы хотите удалить и заново создать данные для определенного раздела, и MSCK REPAIR TABLE
просто делает таблицу осведомленной о разделах в пути HDFS.
Ответ №2:
Попробуйте
spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
data.write.mode(SaveMode.Overwrite).insertInto("table")
Вы также можете попробовать способ @yayati-sule, упомянутый выше, для записи данных, т.Е. Указать целевой каталог напрямую, как показано ниже,
spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
df.write.mode(SaveMode.Overwrite).format("parquet").partitionBy("field1", "field2").save("hdfs://nameservice1/user/raw/table/<YYYYMMDDHHMMSS>")
Также вы можете попробовать установить session conf,
sparkSession.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
Или, если это тоже не удается, попробуйте старомодный способ, а затем сделайте alter table add partition
.
df.write.mode(SaveMode.Overwrite).save("hdfs://nameservice1/user/raw/table/field1=val1/field2=val2/")
Любой, кто использует Pre Hadoop-3.3 и S3, используя Hadoop_S3A_client, позже внесет некоторые улучшения в производительность. Итак, обновите.