#apache-spark #hadoop #hive #apache-hudi
#apache-spark #hadoop #улей #apache-hudi
Вопрос:
Я использую Apache Hudi для записи не разделенной таблицы в AWS S3 и синхронизации ее с hive. Вот DataSourceWriteOptions
что используется.
val hudiOptions: Map[String, String] = Map[String, String](
DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> "MERGE_ON_READ",
DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "PERSON_ID",
DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "",
DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "UPDATED_DATE",
DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "",
DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> classOf[NonPartitionedExtractor].getName,
DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY -> "true",
DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY -> "org.apache.hudi.keygen.NonpartitionedKeyGenerator"
)
Таблица успешно записывается, если она разделена, но выдает ошибку, если я пытаюсь написать таблицу без разделения. Вот фрагмент вывода ошибки
Caused by: java.lang.NullPointerException
at org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getTableMetaClientForBasePath(HoodieInputFormatUtils.java:283)
at org.apache.hudi.hadoop.InputPathHandler.parseInputPaths(InputPathHandler.java:100)
at org.apache.hudi.hadoop.InputPathHandler.<init>(InputPathHandler.java:60)
at org.apache.hudi.hadoop.HoodieParquetInputFormat.listStatus(HoodieParquetInputFormat.java:81)
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:288)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:204)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
at org.apache.spark.rdd.RDD.getNumPartitions(RDD.scala:289)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.mapOutputStatisticsFuture$lzycompute(ShuffleExchangeExec.scala:83)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.mapOutputStatisticsFuture(ShuffleExchangeExec.scala:82)
at org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.cancel(QueryStageExec.scala:152)
at org.apache.spark.sql.execution.adaptive.MaterializeExecutable.cancel(AdaptiveExecutable.scala:357)
at org.apache.spark.sql.execution.adaptive.AdaptiveExecutorRuntime.fail(AdaptiveExecutor.scala:280)
... 41 more
Вот код для HoodieInputFormatUtils.getTableMetaClientForBasePath()
/**
* Extract HoodieTableMetaClient from a partition path(not base path).
* @param fs
* @param dataPath
* @return
* @throws IOException
*/
public static HoodieTableMetaClient getTableMetaClientForBasePath(FileSystem fs, Path dataPath) throws IOException {
int levels = HoodieHiveUtils.DEFAULT_LEVELS_TO_BASEPATH;
if (HoodiePartitionMetadata.hasPartitionMetadata(fs, dataPath)) {
HoodiePartitionMetadata metadata = new HoodiePartitionMetadata(fs, dataPath);
metadata.readFromFS();
levels = metadata.getPartitionDepth();
}
Path baseDir = HoodieHiveUtils.getNthParent(dataPath, levels);
LOG.info("Reading hoodie metadata from path " baseDir.toString());
return new HoodieTableMetaClient(fs.getConf(), baseDir.toString());
}
Строка 283 LOG.info()
— это то, что вызывает исключение NullPointerException. Похоже, что значения конфигурации, предоставленные для разделения, были перепутаны. Этот код выполняется на AWS EMR.
Release label:emr-5.30.1
Hadoop distribution:Amazon 2.8.5
Applications:Hive 2.3.6, Spark 2.4.5
Комментарии:
1. Я проголосовал за это на прошлой неделе, поскольку столкнулся с той же проблемой. Сейчас она у меня работает, и я думаю, что единственные отличия заключаются в том, что я не указываю
HIVE_PARTITION_FIELDS_OPT_KEY
,HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY
, илиHIVE_STYLE_PARTITIONING_OPT_KEY
. Устраняет ли это вашу проблему?
Ответ №1:
Я сомневаюсь, что PARTITIONPATH_FIELD_OPT_KEY и HIVE_PARTITION_FIELDS_OPT_KEY следует оставить неопределенными. Чтобы проверить вашу конфигурацию, я предлагаю перейти к https://doc.hcs.huawei.com/usermanual/mrs/mrs_01_24035.html
Худи.datasource.write.partitionpath.field и толстовка.предполагается, что datasource.hive_sync.partition_fields пустые
Худи.datasource.write.keygenerator.class -> org.apache.hudi.keygen.NonpartitionedKeyGenerator
Худи.datasource.hive_sync.partition_extractor_class->org.apache.hudi.hive.NonPartitionedExtractor
Я столкнулся с проблемой синхронизации улья в PySpark с Hudi 0.9.0, помогла приведенная выше документация.