#java #iceberg
#java #iceberg
Вопрос:
я пытаюсь записать простые данные в таблицу с помощью Apache Iceberg 0.9.1, но появляются сообщения об ошибках. Я хочу напрямую обрабатывать данные с помощью Hadoop. я создаю hadooptable и пытаюсь прочитать из таблицы. после этого я пытаюсь записать данные в таблицу. я готовлю файл json, содержащий одну строку. мой код прочитал объект json и упорядочил порядок данных, но на последнем этапе записи данных всегда возникает ошибка. я изменил некоторые версии пакетов зависимостей, но отображаются другие сообщения об ошибках. Что-то не так с версией пакетов. Пожалуйста, помогите мне.
это мой исходный код:
public class IcebergTest {
public static void main(String[] args) {
testWithoutCatalog();
readDataWithouCatalog();
writeDataWithoutCatalog();
}
public static void testWithoutCatalog() {
Schema bookSchema = new Schema(optional(1, "title", Types.StringType.get()),
optional(2, "price", Types.LongType.get()),
optional(3, "author", Types.StringType.get()),
optional(4, "genre", Types.StringType.get()));
PartitionSpec bookspec = PartitionSpec.builderFor(bookSchema).identity("title").build();
Configuration conf = new Configuration();
String warehousePath = "hdfs://hadoop01:9000/warehouse_path/xgfying/books3";
HadoopTables tables = new HadoopTables(conf);
Table table = tables.create(bookSchema, bookspec, warehousePath);
}
public static void readDataWithouCatalog(){
.......
}
public static void writeDataWithoutCatalog(){
SparkSession spark = SparkSession.builder().master("local[2]").getOrCreate();
Dataset<Row> df = spark.read().json("src/test/data/books3.json");
System.out.println(" this is the writing data : " df.select("title","price","author","genre")
.first().toString());
df.select("title","price","author","genre")
.write().format("iceberg").mode("append")
.save("hdfs://hadoop01:9000/warehouse_path/xgfying/books3");
// System.out.println(df.write().format("iceberg").mode("append").toString());
}
}
это сообщения об ошибках:
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
20/11/18 15:51:36 INFO SparkContext: Running Spark version 2.4.5
.......
file:///C:/tmp/icebergtest1/src/test/data/books3.json, range: 0-75, partition values: [empty row]
20/11/18 15:51:52 ERROR Utils: Aborting task
java.lang.ExceptionInInitializerError
at org.apache.iceberg.parquet.Parquet$WriteBuilder.build(Parquet.java:232)
at org.apache.iceberg.spark.source.SparkAppenderFactory.newAppender(SparkAppenderFactory.java:61)
at org.apache.iceberg.spark.source.BaseWriter.openCurrent(BaseWriter.java:105)
at org.apache.iceberg.spark.source.PartitionedWriter.write(PartitionedWriter.java:63)
at org.apache.iceberg.spark.source.Writer$Partitioned24Writer.write(Writer.java:271)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:118)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:116)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:146)
at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:67)
at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:66)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Cannot find constructor for interface org.apache.parquet.column.page.PageWriteStore
Missing org.apache.parquet.hadoop.ColumnChunkPageWriteStore(org.apache.parquet.hadoop.CodecFactory$BytesCompressor,org.apache.parquet.schema.MessageType,org.apache.parquet.bytes.ByteBufferAllocator,int) [java.lang.NoSuchMethodException: org.apache.parquet.hadoop.ColumnChunkPageWriteStore.<init>(org.apache.parquet.hadoop.CodecFactory$BytesCompressor, org.apache.parquet.schema.MessageType, org.apache.parquet.bytes.ByteBufferAllocator, int)]
at org.apache.iceberg.common.DynConstructors$Builder.build(DynConstructors.java:235)
at org.apache.iceberg.parquet.ParquetWriter.<clinit>(ParquetWriter.java:55)
... 19 more
20/11/18 15:51:52 ERROR DataWritingSparkTask: Aborting commit for partition 0 (task 2, attempt 0, stage 2.0)
20/11/18 15:51:52 ERROR DataWritingSparkTask: Aborted commit for partition 0 (task 2, attempt 0, stage 2.0)
это мой pom.xml:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>icebergtest</groupId>
<artifactId>icebergtest1</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>icebergtest1</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<iceberg.version>0.9.1</iceberg.version>
<hadoop.version>2.7.0</hadoop.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<!-- org.apache.hadoop BEGIN-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
<!--将netty包排除-->
<exclusions>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
</exclusions>
</dependency>
<!--解决io.netty.buffer.PooledByteBufAllocator.defaultNumHeapArena()I异常,-->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.18.Final</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<version>${hadoop.version}</version>
</dependency>
<!-- org.apache.hadoop END-->
<!-- org.apache.iceberg BEGIN-->
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-core</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-api</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-parquet</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-common</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-orc</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-data</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-hive</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-arrow</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-spark</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-bundled-guava</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-spark-runtime</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-spark2</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-flink</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-pig</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-mr</artifactId>
<version>${iceberg.version}</version>
</dependency>
<!-- org.apache.iceberg END-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.5</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.5</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.4.5</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.11</artifactId>
<version>2.4.5</version>
<exclusions>
<exclusion>
<groupId>org.codehaus.janino</groupId>
<artifactId>commons-compiler</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.0</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<version>2.11.0</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
<version>2.11.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<!--<version>2.7.9</version>-->
<version>2.6.6</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<!--<version>2.7.9.4</version>-->
<version>2.6.5</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<!--<version>2.7.9</version>-->
<version>2.6.5</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.56</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<version>1.11.1</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-column</artifactId>
<version>1.11.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-hive -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>2.4.0</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
Комментарии:
1. можете ли вы показать свои зависимости? Возможно, у вас есть некоторые зависимости в <compile>
2. Спасибо! я добавляю pom.xml , пожалуйста, помогите.
Ответ №1:
Отсутствует org.apache.parquet.hadoop.ColumnChunkPageWriteStore(org.apache.parquet.hadoop.CodecFactory $BytesCompressor,org.apache.parquet.schema.MessageType,org.apache.parquet.bytes.ByteBufferAllocator,int) [java.lang.Исключение NoSuchMethodException: org.apache.parquet.hadoop.ColumnChunkPageWriteStore.(org.apache.parquet.hadoop.CodecFactory $BytesCompressor, org.apache.parquet.schema.MessageType, org.apache.parquet.bytes.ByteBufferAllocator, int)]
Означает, что вы используете конструктор ColumnChunkPageWriteStore, который принимает 4 параметра типов (org.apache.parquet.hadoop.CodecFactory $BytesCompressor, org.apache.parquet.schema.MessageType, org.apache.parquet.bytes.ByteBufferAllocator, int)
Он не может найти конструктор, который вы используете. Вот почему NoSuchMethodError
Согласно https://jar-download.com/artifacts/org.apache.parquet/parquet-hadoop/1.8.1/source-code/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java , вам нужен 1.8.1 parquet-hadoop
Измените импорт mvn на более старую версию. Я просмотрел исходный код 1.8.1, и в нем есть нужный вам конструктор.
Комментарии:
1. Спасибо. я добавляю ссылку org.apache.parquet.parquet-hadoop 1.8.1 в pom.xml , код показывает новое сообщение об ошибке: « Provider org.apache.spark.sql.execution.datasources.parquet. Не удалось создать экземпляр ParquetFileFormat …. Вызвано: java.lang. Ошибка NoClassDefFoundError: org/apache/parquet/hadoop/ParquetOutputFormat$JobSummaryLevel …. Вызвано: java.lang. Исключение ClassNotFoundException: org.apache.parquet.hadoop. ParquetOutputFormat$JobSummaryLevel `’ я не знаю, являются ли мои ссылочные пакеты в pom.xml возьмем конфликт.
2. @harryboot каждый раз, когда вы видите такое ClassNotFoundException, это означает, что вам не хватает библиотек или у вас есть библиотека с версией, в которой отсутствует сигнатура класса / метода.
3. я успешно записываю данные. я добавляю org.apache.parquet.parquet-пакет hadoop, но версию, которую я использовал 1.11.0. Большое вам спасибо.