#scala #apache-spark #apache-spark-sql
#scala #apache-spark #структура #apache-spark-sql #схема
Вопрос:
Spark: 3.0.0
Scala: 2.12.8
В моем фрейме данных есть столбец со строкой JSON, и я хочу создать из него новый столбец с StructType .
temp_json_string |
---|
{«name»:»test»,»id»:»12″,»category»:[{«products»:[«A»,»B»],»displayName»:»test_1″,»displayLabel»:»test1″},{«products»:[«C»],»DisplayName»:»test_2″,»displayLabel»:»test2″}],»createdAt»:»»,»CreatedBy»:»»} |
root
|-- temp_json_string: string (nullable = true)
Форматированный JSON:
{
"name":"test",
"id":"12",
"category":[
{
"products":[
"A",
"B"
],
"displayName":"test_1",
"displayLabel":"test1"
},
{
"products":[
"C"
],
"displayName":"test_2",
"displayLabel":"test2"
}
],
"createdAt":"",
"createdBy":""
}
Я хочу создать новый столбец типа Struct, поэтому я попытался:
dataFrame
.withColumn("temp_json_struct", struct(col("temp_json_string")))
.select("temp_json_struct")
Теперь я получаю схему в виде:
root
|-- temp_json_struct: struct (nullable = false)
| |-- temp_json_string: string (nullable = true)
Желаемый результат:
root
|-- temp_json_struct: struct (nullable = false)
| |-- name: string (nullable = true)
| |-- category: array (nullable = true)
| | |-- products: array (nullable = true)
| | |-- displayName: string (nullable = true)
| | |-- displayLabel: string (nullable = true)
| |-- createdAt: timestamp (nullable = true)
| |-- updatedAt: timestamp (nullable = true)
Ответ №1:
json_str_col — это столбец, содержащий строку JSON. У меня было несколько файлов, поэтому первая строка перебирает каждую строку для извлечения схемы. Если вы заранее знаете свою схему, просто замените json_schema
ее на эту.
json_schema = spark.read.json(df.rdd.map(lambda row: row.json_str_col)).schema
df = df.withColumn('new_col', from_json(col('json_str_col'), json_schema))
Ответ №2:
// import spark implicits for conversion to dataset (.as[String])
import spark.implicits._
val df = ??? //create your dataframe having the 'temp_json_string' column
//convert Dataset[Row] aka Dataframe to Dataset[String]
val ds = df.select("temp_json_string").as[String]
//read as json
spark.read.json(ds)
Комментарии:
1. Пожалуйста, добавьте некоторые пояснения к вашему ответу, чтобы другие могли извлечь из него уроки
2. Объяснение в комментариях. Что еще я должен объяснить
3. Хорошее объяснение может содержать не только код и то, что делает код, но и почему вы решили сделать это именно так, особенно если вопрос уже содержит другие ответы
4. Просто и элегантно. По какой-то причине ни один из других ответов не работал на моем экземпляре (либо отсутствует
lambda
,schema_of_json
либоtoDS
методы), но здесь проблем нет!
Ответ №3:
Существует по крайней мере два разных способа получения / обнаружения схемы для данного JSON.
Для иллюстрации давайте сначала создадим некоторые данные:
import org.apache.spark.sql.types.StructType
val jsData = Seq(
("""{
"name":"test","id":"12","category":[
{
"products":[
"A",
"B"
],
"displayName":"test_1",
"displayLabel":"test1"
},
{
"products":[
"C"
],
"displayName":"test_2",
"displayLabel":"test2"
}
],
"createdAt":"",
"createdBy":""}""")
)
Вариант 1: schema_of_json
Первый вариант — использовать встроенную функцию schema_of_json . Функция вернет схему для данного JSON в формате DDL:
val json = jsData.toDF("js").collect()(0).getString(0)
val ddlSchema: String = spark.sql(s"select schema_of_json('${json}')")
.collect()(0) //get 1st row
.getString(0) //get 1st col of the row as string
.replace("null", "string") //replace type with string, this occurs since you have "createdAt":""
// struct<category:array<struct<displayLabel:string,displayName:string,products:array<string>>>,createdAt:null,createdBy:null,id:string,name:string>
val schema: StructType = StructType.fromDDL(s"js_schema $ddlSchema")
Обратите внимание, что вы ожидаете, что schema_of_json
это также будет работать на уровне столбца, т.е.: schema_of_json(js_col)
, к сожалению, это работает не так, как ожидалось, поэтому мы вынуждены передавать строку вместо этого.
Вариант 2. используйте Spark JSON Reader (рекомендуется)
import org.apache.spark.sql.functions.from_json
val schema: StructType = spark.read.json(jsData.toDS).schema
// schema.printTreeString
// root
// |-- category: array (nullable = true)
// | |-- element: struct (containsNull = true)
// | | |-- displayLabel: string (nullable = true)
// | | |-- displayName: string (nullable = true)
// | | |-- products: array (nullable = true)
// | | | |-- element: string (containsNull = true)
// |-- createdAt: string (nullable = true)
// |-- createdBy: string (nullable = true)
// |-- id: string (nullable = true)
// |-- name: string (nullable = true)
Как вы можете видеть, здесь мы создаем схему на основе StructType
, а не строки DDL, как в предыдущем случае.
После обнаружения схемы мы можем перейти к следующему шагу, который преобразует данные JSON в структуру. Для достижения этого мы будем использовать from_json
встроенную функцию:
jsData.toDF("js")
.withColumn("temp_json_struct", from_json($"js", schema))
.printSchema()
// root
// |-- js: string (nullable = true)
// |-- temp_json_struct: struct (nullable = true)
// | |-- category: array (nullable = true)
// | | |-- element: struct (containsNull = true)
// | | | |-- displayLabel: string (nullable = true)
// | | | |-- displayName: string (nullable = true)
// | | | |-- products: array (nullable = true)
// | | | | |-- element: string (containsNull = true)
// | |-- createdAt: string (nullable = true)
// | |-- createdBy: string (nullable = true)
// | |-- id: string (nullable = true)
// | |-- name: string (nullable = true)