чтение и преобразование файлов parquet в cloud data fusion

#google-cloud-data-fusion

#google-cloud-data-fusion

Вопрос:

Попытка принять и преобразовать файл parquet в cloud data fusion. Я вижу, что могу использовать файл parquet с помощью плагина GCS. Но когда я хочу преобразовать его с помощью плагина wrangler, я не вижу никакой возможности для этого. Есть ли у плагина wrangler такая возможность вообще или мне следует рассмотреть другой подход? Кстати, я только что развернул свой конвейер, чтобы посмотреть, смогу ли я получить файл parquet из GCS, но я вижу эту ошибку в журналах:

 java.lang.NoClassDefFoundError: org/xerial/snappy/Snappy
    at org.apache.parquet.hadoop.codec.SnappyDecompressor.decompress(SnappyDecompressor.java:62) ~[parquet-hadoop-1.8.3.jar:1.8.3]
    at org.apache.parquet.hadoop.codec.NonBlockedDecompressorStream.read(NonBlockedDecompressorStream.java:51) ~[parquet-hadoop-1.8.3.jar:1.8.3]
    at java.io.DataInputStream.readFully(DataInputStream.java:195) ~[na:1.8.0_275]
    at java.io.DataInputStream.readFully(DataInputStream.java:169) ~[na:1.8.0_275]
    at org.apache.parquet.bytes.BytesInput$StreamBytesInput.toByteArray(BytesInput.java:204) ~[parquet-encoding-1.8.3.jar:1.8.3]
    at org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainBinaryDictionary.<init>(PlainValuesDictionary.java:89) ~[parquet-column-1.8.3.jar:1.8.3]
    at org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainBinaryDictionary.<init>(PlainValuesDictionary.java:72) ~[parquet-column-1.8.3.jar:1.8.3]
    at org.apache.parquet.column.Encoding$1.initDictionary(Encoding.java:90) ~[parquet-column-1.8.3.jar:1.8.3]
    at org.apache.parquet.column.Encoding$4.initDictionary(Encoding.java:149) ~[parquet-column-1.8.3.jar:1.8.3]
    at org.apache.parquet.column.impl.ColumnReaderImpl.<init>(ColumnReaderImpl.java:343) ~[parquet-column-1.8.3.jar:1.8.3]
    at org.apache.parquet.column.impl.ColumnReadStoreImpl.newMemColumnReader(ColumnReadStoreImpl.java:82) ~[parquet-column-1.8.3.jar:1.8.3]
    at org.apache.parquet.column.impl.ColumnReadStoreImpl.getColumnReader(ColumnReadStoreImpl.java:77) ~[parquet-column-1.8.3.jar:1.8.3]
    at org.apache.parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:270) ~[parquet-column-1.8.3.jar:1.8.3]
    at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:135) ~[parquet-column-1.8.3.jar:1.8.3]
    at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:101) ~[parquet-column-1.8.3.jar:1.8.3]
    at org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:154) ~[parquet-column-1.8.3.jar:1.8.3]
    at org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:101) ~[parquet-column-1.8.3.jar:1.8.3]
    at org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:140) ~[parquet-hadoop-1.8.3.jar:1.8.3]
    at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:214) ~[parquet-hadoop-1.8.3.jar:1.8.3]
    at org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:227) ~[parquet-hadoop-1.8.3.jar:1.8.3]
    at io.cdap.plugin.format.parquet.input.PathTrackingParquetInputFormat$ParquetRecordReader.nextKeyValue(PathTrackingParquetInputFormat.java:76) ~[1614054281928-0/:na]
    at io.cdap.plugin.format.input.PathTrackingInputFormat$TrackingRecordReader.nextKeyValue(PathTrackingInputFormat.java:136) ~[na:na]
    at org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReaderWrapper.nextKeyValue(CombineFileRecordReaderWrapper.java:90) ~[hadoop-mapreduce-client-core-2.9.2.jar:na]
    at org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader.nextKeyValue(CombineFileRecordReader.java:65) ~[hadoop-mapreduce-client-core-2.9.2.jar:na]
    at org.apache.spark.rdd.NewHadoopRDD$anon$1.hasNext(NewHadoopRDD.scala:214) ~[spark-core_2.11-2.3.4.jar:2.3.4]
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) ~[spark-core_2.11-2.3.4.jar:2.3.4]
    at scala.collection.Iterator$anon$12.hasNext(Iterator.scala:439) ~[scala-library-2.11.8.jar:na]
    at scala.collection.Iterator$anon$12.hasNext(Iterator.scala:439) ~[scala-library-2.11.8.jar:na]
    at scala.collection.Iterator$anon$12.hasNext(Iterator.scala:439) ~[scala-library-2.11.8.jar:na]
    at scala.collection.Iterator$anon$12.hasNext(Iterator.scala:439) ~[scala-library-2.11.8.jar:na]
    at scala.collection.Iterator$anon$12.hasNext(Iterator.scala:439) ~[scala-library-2.11.8.jar:na]
    at org.apache.spark.internal.io.SparkHadoopWriter$anonfun$4.apply(SparkHadoopWriter.scala:130) ~[spark-core_2.11-2.3.4.jar:2.3.4]
    at org.apache.spark.internal.io.SparkHadoopWriter$anonfun$4.apply(SparkHadoopWriter.scala:129) ~[spark-core_2.11-2.3.4.jar:2.3.4]
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1415) ~[spark-core_2.11-2.3.4.jar:2.3.4]
    at org.apache.spark.internal.io.SparkHadoopWriter$.org$apache$spark$internal$io$SparkHadoopWriter$executeTask(SparkHadoopWriter.scala:141) [spark-core_2.11-2.3.4.jar:2.3.4]
    at org.apache.spark.internal.io.SparkHadoopWriter$anonfun$3.apply(SparkHadoopWriter.scala:83) [spark-core_2.11-2.3.4.jar:2.3.4]
    at org.apache.spark.internal.io.SparkHadoopWriter$anonfun$3.apply(SparkHadoopWriter.scala:78) [spark-core_2.11-2.3.4.jar:2.3.4]
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) [spark-core_2.11-2.3.4.jar:2.3.4]
    at org.apache.spark.scheduler.Task.run(Task.scala:109) [spark-core_2.11-2.3.4.jar:2.3.4]
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) [spark-core_2.11-2.3.4.jar:2.3.4]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_275]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_275]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_275]
Caused by: java.lang.ClassNotFoundException: org.xerial.snappy.Snappy
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382) ~[na:1.8.0_275]
    at io.cdap.cdap.common.lang.InterceptableClassLoader.findClass(InterceptableClassLoader.java:44) ~[na:na]
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[na:1.8.0_275]
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ~[na:1.8.0_275]
    ... 43 common frames omitted

 

Нужно ли мне устанавливать определенный модуль в моем кластере (возможно ли это вообще)?

Ответ №1:

Какую графическую версию Dataproc вы использовали?

Похоже, что некоторые версии Dataproc 2.0 не включали библиотеки Snappy в Hadoop.

В качестве обходного пути вы можете скопировать snappy jars в следующие каталоги:

 sudo cp /usr/lib/hive/lib/snappy-java-*.jar /usr/lib/hadoop-mapreduce/
sudo cp /usr/lib/hive/lib/snappy-java-*.jar /usr/lib/hadoop/lib
sudo cp /usr/lib/hive/lib/snappy-java-*.jar /usr/lib/hadoop-yarn/lib
 

Это можно сделать в сценарии инициализации Dataproc [1] с помощью вычислительного профиля Data Fusion следующим образом:

В Data Fusion перейдите к: Системный администратор -> Конфигурация -> Профили системных вычислений -> (создать новый) -> Дополнительные настройки -> Действия инициализации. В противном случае на развернутом конвейере в studio перейдите в меню Настройка -> Конфигурация вычислений -> (выберите профиль) -> Настройка -> Дополнительные настройки -> Действия инициализации.

Примечание: эти параметры недоступны в экземплярах разработчика, только в Basic / Enterprise.

[1] https://cloud.google.com/dataproc/docs/concepts/configuring-clusters/init-actions