Не удается записать данные в таблицу с помощью Apache Iceberg

#java #iceberg

#java #iceberg

Вопрос:

я пытаюсь записать простые данные в таблицу с помощью Apache Iceberg 0.9.1, но появляются сообщения об ошибках. Я хочу напрямую обрабатывать данные с помощью Hadoop. я создаю hadooptable и пытаюсь прочитать из таблицы. после этого я пытаюсь записать данные в таблицу. я готовлю файл json, содержащий одну строку. мой код прочитал объект json и упорядочил порядок данных, но на последнем этапе записи данных всегда возникает ошибка. я изменил некоторые версии пакетов зависимостей, но отображаются другие сообщения об ошибках. Что-то не так с версией пакетов. Пожалуйста, помогите мне.

это мой исходный код:

 
public class IcebergTest {

    public static void main(String[] args) {
        testWithoutCatalog();
        readDataWithouCatalog();
        writeDataWithoutCatalog();

    }

    public static void testWithoutCatalog() {

        Schema bookSchema = new Schema(optional(1, "title", Types.StringType.get()),
                optional(2, "price", Types.LongType.get()), 
                optional(3, "author", Types.StringType.get()),               
                optional(4, "genre", Types.StringType.get()));
        PartitionSpec bookspec = PartitionSpec.builderFor(bookSchema).identity("title").build();

        Configuration conf = new Configuration();
        
        String warehousePath = "hdfs://hadoop01:9000/warehouse_path/xgfying/books3";

        HadoopTables tables = new HadoopTables(conf);
        Table table = tables.create(bookSchema, bookspec, warehousePath);
    }

    public static void readDataWithouCatalog(){
        .......
    }

    public static void writeDataWithoutCatalog(){
        SparkSession spark = SparkSession.builder().master("local[2]").getOrCreate();
        Dataset<Row> df = spark.read().json("src/test/data/books3.json");       
        System.out.println(" this is the writing data : " df.select("title","price","author","genre")
                                                            .first().toString());
        df.select("title","price","author","genre")
          .write().format("iceberg").mode("append")
          .save("hdfs://hadoop01:9000/warehouse_path/xgfying/books3");
        // System.out.println(df.write().format("iceberg").mode("append").toString());
    }

}
  

это сообщения об ошибках:

 Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
20/11/18 15:51:36 INFO SparkContext: Running Spark version 2.4.5
.......
file:///C:/tmp/icebergtest1/src/test/data/books3.json, range: 0-75, partition values: [empty row]
20/11/18 15:51:52 ERROR Utils: Aborting task
java.lang.ExceptionInInitializerError
        at org.apache.iceberg.parquet.Parquet$WriteBuilder.build(Parquet.java:232)
        at org.apache.iceberg.spark.source.SparkAppenderFactory.newAppender(SparkAppenderFactory.java:61)
        at org.apache.iceberg.spark.source.BaseWriter.openCurrent(BaseWriter.java:105)
        at org.apache.iceberg.spark.source.PartitionedWriter.write(PartitionedWriter.java:63)
        at org.apache.iceberg.spark.source.Writer$Partitioned24Writer.write(Writer.java:271)
        at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:118)
        at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:116)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
        at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:146)
        at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:67)
        at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:66)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Cannot find constructor for interface org.apache.parquet.column.page.PageWriteStore       
        Missing org.apache.parquet.hadoop.ColumnChunkPageWriteStore(org.apache.parquet.hadoop.CodecFactory$BytesCompressor,org.apache.parquet.schema.MessageType,org.apache.parquet.bytes.ByteBufferAllocator,int) [java.lang.NoSuchMethodException: org.apache.parquet.hadoop.ColumnChunkPageWriteStore.<init>(org.apache.parquet.hadoop.CodecFactory$BytesCompressor, org.apache.parquet.schema.MessageType, org.apache.parquet.bytes.ByteBufferAllocator, int)]
        at org.apache.iceberg.common.DynConstructors$Builder.build(DynConstructors.java:235)
        at org.apache.iceberg.parquet.ParquetWriter.<clinit>(ParquetWriter.java:55)
        ... 19 more
20/11/18 15:51:52 ERROR DataWritingSparkTask: Aborting commit for partition 0 (task 2, attempt 0, stage 2.0)
20/11/18 15:51:52 ERROR DataWritingSparkTask: Aborted commit for partition 0 (task 2, attempt 0, stage 2.0)

  

это мой pom.xml:

 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>icebergtest</groupId>
  <artifactId>icebergtest1</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>icebergtest1</name>
  <url>http://maven.apache.org</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <iceberg.version>0.9.1</iceberg.version>
        <hadoop.version>2.7.0</hadoop.version>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
  </properties>

  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
    
          <!-- org.apache.hadoop BEGIN-->
      <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-common</artifactId>
          <version>${hadoop.version}</version>
      </dependency>

      <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-hdfs</artifactId>
          <version>${hadoop.version}</version>
      </dependency>

      <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-client</artifactId>
          <version>${hadoop.version}</version>
          
    <!--将netty包排除-->
      <exclusions>
        <exclusion>
          <groupId>io.netty</groupId>
          <artifactId>netty</artifactId>
        </exclusion>
      </exclusions>
          
      </dependency>
      
      <!--解决io.netty.buffer.PooledByteBufAllocator.defaultNumHeapArena()I异常,-->
      <dependency>
          <groupId>io.netty</groupId>
          <artifactId>netty-all</artifactId>
          <version>4.1.18.Final</version>
      </dependency>

      <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-mapreduce-client-core</artifactId>
          <version>${hadoop.version}</version>
      </dependency>
      
          <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-auth</artifactId>
          <version>${hadoop.version}</version>
      </dependency>
      
          <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
          <version>${hadoop.version}</version>
      </dependency>
      <!-- org.apache.hadoop END-->

      <!-- org.apache.iceberg BEGIN-->
      <dependency>
        <groupId>org.apache.iceberg</groupId>
        <artifactId>iceberg-core</artifactId>
        <version>${iceberg.version}</version>
      </dependency>


      <dependency>
          <groupId>org.apache.iceberg</groupId>
          <artifactId>iceberg-api</artifactId>
          <version>${iceberg.version}</version>
      </dependency>
      
        <dependency>
          <groupId>org.apache.iceberg</groupId>
          <artifactId>iceberg-parquet</artifactId>
          <version>${iceberg.version}</version>
      </dependency>
     

      
        <dependency>
          <groupId>org.apache.iceberg</groupId>
          <artifactId>iceberg-common</artifactId>
          <version>${iceberg.version}</version>
      </dependency>
      
        <dependency>
          <groupId>org.apache.iceberg</groupId>
          <artifactId>iceberg-orc</artifactId>
          <version>${iceberg.version}</version>
      </dependency>
      
        <dependency>
          <groupId>org.apache.iceberg</groupId>
          <artifactId>iceberg-data</artifactId>
          <version>${iceberg.version}</version>
      </dependency>
      
        <dependency>
          <groupId>org.apache.iceberg</groupId>
          <artifactId>iceberg-hive</artifactId>
          <version>${iceberg.version}</version>
      </dependency>
      
        <dependency>
          <groupId>org.apache.iceberg</groupId>
          <artifactId>iceberg-arrow</artifactId>
          <version>${iceberg.version}</version>
      </dependency>
      
        <dependency>
          <groupId>org.apache.iceberg</groupId>
          <artifactId>iceberg-spark</artifactId>
          <version>${iceberg.version}</version>
      </dependency>
      
        <dependency>
          <groupId>org.apache.iceberg</groupId>
          <artifactId>iceberg-bundled-guava</artifactId>
          <version>${iceberg.version}</version>
      </dependency>
      
        <dependency>
          <groupId>org.apache.iceberg</groupId>
          <artifactId>iceberg-spark-runtime</artifactId>
          <version>${iceberg.version}</version>
      </dependency>
      
        <dependency>
          <groupId>org.apache.iceberg</groupId>
          <artifactId>iceberg-spark2</artifactId>
          <version>${iceberg.version}</version>
      </dependency>
      
        <dependency>
          <groupId>org.apache.iceberg</groupId>
          <artifactId>iceberg-flink</artifactId>
          <version>${iceberg.version}</version>
      </dependency>
      
        <dependency>
          <groupId>org.apache.iceberg</groupId>
          <artifactId>iceberg-pig</artifactId>
          <version>${iceberg.version}</version>
      </dependency>
      
      <dependency>
          <groupId>org.apache.iceberg</groupId>
          <artifactId>iceberg-mr</artifactId>
          <version>${iceberg.version}</version>
      </dependency>
      <!-- org.apache.iceberg END-->
      
      
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.4.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.4.5</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>2.4.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-mllib_2.11</artifactId>
            <version>2.4.5</version>
            <exclusions>
                <exclusion>
                    <groupId>org.codehaus.janino</groupId>
                    <artifactId>commons-compiler</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.0</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-compiler</artifactId>
            <version>2.11.0</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-reflect</artifactId>
            <version>2.11.0</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind -->
        
        
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <!--<version>2.7.9</version>-->
            <version>2.6.6</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <!--<version>2.7.9.4</version>-->
            <version>2.6.5</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-annotations</artifactId>
            <!--<version>2.7.9</version>-->
            <version>2.6.5</version>
        </dependency>


        <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.56</version>
        </dependency>
        
        <dependency>
           <groupId>org.apache.parquet</groupId>
           <artifactId>parquet-avro</artifactId>
           <version>1.11.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.10.0</version>
        </dependency>
        <dependency>
           <groupId>org.apache.parquet</groupId>
           <artifactId>parquet-column</artifactId>
           <version>1.11.1</version>
        </dependency>


        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-hive -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
            <version>2.4.0</version>
            <scope>provided</scope>
        </dependency>



    
  </dependencies>
</project>
  

Комментарии:

1. можете ли вы показать свои зависимости? Возможно, у вас есть некоторые зависимости в <compile>

2. Спасибо! я добавляю pom.xml , пожалуйста, помогите.

Ответ №1:

Отсутствует org.apache.parquet.hadoop.ColumnChunkPageWriteStore(org.apache.parquet.hadoop.CodecFactory $BytesCompressor,org.apache.parquet.schema.MessageType,org.apache.parquet.bytes.ByteBufferAllocator,int) [java.lang.Исключение NoSuchMethodException: org.apache.parquet.hadoop.ColumnChunkPageWriteStore.(org.apache.parquet.hadoop.CodecFactory $BytesCompressor, org.apache.parquet.schema.MessageType, org.apache.parquet.bytes.ByteBufferAllocator, int)]

Означает, что вы используете конструктор ColumnChunkPageWriteStore, который принимает 4 параметра типов (org.apache.parquet.hadoop.CodecFactory $BytesCompressor, org.apache.parquet.schema.MessageType, org.apache.parquet.bytes.ByteBufferAllocator, int)

Он не может найти конструктор, который вы используете. Вот почему NoSuchMethodError

Согласно https://jar-download.com/artifacts/org.apache.parquet/parquet-hadoop/1.8.1/source-code/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java , вам нужен 1.8.1 parquet-hadoop

Измените импорт mvn на более старую версию. Я просмотрел исходный код 1.8.1, и в нем есть нужный вам конструктор.

Комментарии:

1. Спасибо. я добавляю ссылку org.apache.parquet.parquet-hadoop 1.8.1 в pom.xml , код показывает новое сообщение об ошибке: « Provider org.apache.spark.sql.execution.datasources.parquet. Не удалось создать экземпляр ParquetFileFormat …. Вызвано: java.lang. Ошибка NoClassDefFoundError: org/apache/parquet/hadoop/ParquetOutputFormat$JobSummaryLevel …. Вызвано: java.lang. Исключение ClassNotFoundException: org.apache.parquet.hadoop. ParquetOutputFormat$JobSummaryLevel `’ я не знаю, являются ли мои ссылочные пакеты в pom.xml возьмем конфликт.

2. @harryboot каждый раз, когда вы видите такое ClassNotFoundException, это означает, что вам не хватает библиотек или у вас есть библиотека с версией, в которой отсутствует сигнатура класса / метода.

3. я успешно записываю данные. я добавляю org.apache.parquet.parquet-пакет hadoop, но версию, которую я использовал 1.11.0. Большое вам спасибо.