spark преобразует фрейм данных в набор данных, используя класс case с полями опций

#scala #apache-spark #apache-spark-sql #apache-spark-dataset

#scala #apache-spark #apache-spark-sql #apache-spark-dataset

Вопрос:

У меня есть следующий класс case:

 case class Person(name: String, lastname: Option[String] = None, age: BigInt) {}
  

И следующий json:

 { "name": "bemjamin", "age" : 1 }
  

Когда я пытаюсь преобразовать свой фрейм данных в набор данных:

 spark.read.json("example.json")
  .as[Person].show()
  

Он показывает мне следующую ошибку:

Исключение в потоке «main» org.apache.spark.sql.AnalysisException: не удается разрешить ‘ lastname ‘ заданные входные столбцы: [возраст, имя];

Мой вопрос: если моя схема является моим классом case и она определяет, что фамилия необязательна, разве as () не должен выполнять преобразование?

Я могу легко исправить это с помощью .map, но я хотел бы знать, есть ли другая более чистая альтернатива этому.

Комментарии:

1. Привет, если вы добавите еще один json (запись 2) с фамилией в нем, если это сработает, то, я думаю, это приведет схему к трем столбцам. В вашем примере только с одной записью, как spark (или кто-либо еще) узнает, что вы собираетесь иметь 3 столбца?

2. @Bemjamin Можете ли вы, пожалуйста, сообщить мне, как бы вы решили это с помощью map? Я пытаюсь решить это для вложенного json, используя предоставленный ответ, но это не работает, поэтому просто хотел получить некоторую подсказку.

Ответ №1:

У нас есть еще один вариант решения вышеуказанной проблемы.Требуется выполнить 2 шага

  1. Убедитесь, что поля, которые могут отсутствовать, объявлены как типы Scala с возможностью обнуления (например, Option[_]).

  2. Предоставьте аргумент схемы и не зависите от вывода схемы.Вы можете использовать, например, использовать Spark SQL Encoder:

     import org.apache.spark.sql.Encoders
    
    val schema = Encoders.product[Person].schema
      

Вы можете обновить код, как показано ниже.

 val schema = Encoders.product[Person].schema

val df = spark.read
           .schema(schema)
           .json("/Users/../Desktop/example.json")
           .as[Person]

 -------- -------- --- 
|    name|lastname|age|
 -------- -------- --- 
|bemjamin|    null|  1|
 -------- -------- --- 
  

Комментарии:

1. можем ли мы сделать что-то подобное для вложенного json?

Ответ №2:

Когда вы выполняете spark.read.json("example.json").as[Person].show() , он в основном считывает фрейм данных как ,

 FileScan json [age#6L,name#7]
  

а затем пытается применить кодировщики для объекта Person, следовательно, получая исключение AnalysisException, поскольку он не может найти lastname из вашего файла json.

Либо вы могли бы намекнуть spark, что lastname является необязательным, предоставив некоторые данные с lastname, либо попробуйте это:

 val schema: StructType = ScalaReflection.schemaFor[Person].dataType.asInstanceOf[StructType]
val x = spark.read
      .schema(schema)
      .json("src/main/resources/json/x.json")
      .as[Person]
 -------- -------- --- 
|    name|lastname|age|
 -------- -------- --- 
|bemjamin|    null|  1|
 -------- -------- --- 
  

Надеюсь, это поможет.