Как читать и записывать пользовательский класс из файла parquet

#java #apache-spark #apache-spark-sql #spark-dataframe #parquet

#java #apache-spark #apache-spark-sql #spark-dataframe #parquet

Вопрос:

Я пытаюсь написать класс чтения / записи parquet для определенного типа класса, используя DataFrame / datasets

схема классов:

 class A {
  long count;
  List<B> listOfValues;
}
class B {
  String id;
  long count;
}
 

код :

   String path = "some path";
  List<A> entries = somerandomAentries();
  JavaRDD<A> rdd = sc.parallelize(entries, 1);
  DataFrame df = sqlContext.createDataFrame(rdd, A.class);

  df.write().parquet(path);
  DataFrame newDataDF = sqlContext.read().parquet(path);
  newDataDF.show();
 

когда я пытаюсь запустить это, он выдает ошибку. чего мне здесь не хватает? Нужно ли мне предоставлять схему для всего класса при создании фреймов данных
ошибка:

     Caused by: scala.MatchError: B(Id=abc, count=0) (of class B)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:255)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:250)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter.toCatalystImpl(CatalystTypeConverters.scala:169)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter.toCatalystImpl(CatalystTypeConverters.scala:153)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:401)
    at org.apache.spark.sql.SQLContext$anonfun$org$apache$spark$sql$SQLContext$beansToRows$1$anonfun$apply$1.apply(SQLContext.scala:1358)
    at org.apache.spark.sql.SQLContext$anonfun$org$apache$spark$sql$SQLContext$beansToRows$1$anonfun$apply$1.apply(SQLContext.scala:1358)
    at scala.collection.TraversableLike$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
    at org.apache.spark.sql.SQLContext$anonfun$org$apache$spark$sql$SQLContext$beansToRows$1.apply(SQLContext.scala:1358)
    at org.apache.spark.sql.SQLContext$anonfun$org$apache$spark$sql$SQLContext$beansToRows$1.apply(SQLContext.scala:1356)
    at scala.collection.Iterator$anon$11.next(Iterator.scala:328)
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:263)
    ... 8 more
 

Комментарии:

1. Какую версию spark вы используете?

2. @abaghel: я использую spark 1.6

Ответ №1:

Вы получаете сообщение об ошибке, потому что вложенные JavaBeans не поддерживаются в версии Spark 1.6. Пожалуйста, смотрите https://spark.apache.org/docs/1.6.0/sql-programming-guide.html#inferring-the-schema-using-reflection

В настоящее время Spark SQL не поддерживает JavaBeans, которые содержат вложенные или содержат сложные типы, такие как списки или массивы.

Комментарии:

1. хорошо, это имеет смысл. Я считаю, что могу использовать кодировщики с spark 1.6 с вложенными сложными типами