Разнесите JSON в PysparkSQL

#json #apache-spark #pyspark #apache-spark-sql

Вопрос:

Я ищу, чтобы преобразовать вложенный json в CSV-файл. Ищу, чтобы разобрать вложенный json на строки и столбцы.

 from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql import Row
df=spark.read.option("multiline","true").json("sample1.json")
df.printSchema()

root
 |-- pid: struct (nullable = true)
 |    |-- Body: struct (nullable = true)
 |    |    |-- Vendor: struct (nullable = true)
 |    |    |    |-- RC: struct (nullable = true)
 |    |    |    |    |-- Updated_From_Date: string (nullable = true)
 |    |    |    |    |-- Updated_To_Date: string (nullable = true)
 |    |    |    |-- RD: struct (nullable = true)
 |    |    |    |    |-- Supplier: struct (nullable = true)
 |    |    |    |    |    |-- Supplier_Data: struct (nullable = true)
 |    |    |    |    |    |    |-- Days: long (nullable = true)
 |    |    |    |    |    |    |-- Reference: struct (nullable = true)
 |    |    |    |    |    |    |    |-- ID: array (nullable = true)
 |    |    |    |    |    |    |    |    |-- element: string (containsNull = true)
 |    |    |    |    |    |    |-- Expected: long (nullable = true)
 |    |    |    |    |    |    |-- Payments: long (nullable = true)
 |    |    |    |    |    |    |-- Approval: struct (nullable = true)
 |    |    |    |    |    |    |    |-- ID: array (nullable = true)
 |    |    |    |    |    |    |    |    |-- element: string (containsNull = true)
 |    |    |    |    |    |    |-- Areas_Changed: struct (nullable = true)
 |    |    |    |    |    |    |    |-- Alternate_Names: long (nullable = true)
 |    |    |    |    |    |    |    |-- Attachments: long (nullable = true)
 |    |    |    |    |    |    |    |-- Classifications: long (nullable = true)
 |    |    |    |    |    |    |    |-- Contact_Information: long (nullable = true)
 

Мой Код:

 df2=(df.select(F.explode("pid").alias('pid'))
         .select('pid.*')
         .select(F.explode('Body').alias('Body'))
         .select('Body.*')
         .select((F.explode('Vendor').alias('Vendor'))
         .select('Vendor.*')
         .select((F.explode('RC').alias('RC'))
         .select('RC.*'))))
 

Ошибка:
Исключение AnalysisException: не удается разрешить «взрыв(pid)» из-за несоответствия типов данных: ввод для функции «взрыв» должен быть типом массива или карты, а не структурой<Тело:структура< …..

Как я могу проанализировать поля структуры. любая помощь будет очень признательна 🙂

Ответ №1:

Вы можете использовать explode функцию только для карты или типа массива. Для доступа к типу strcut просто используйте . оператор.

Допустим, вы хотите получить столбцы под RC и RD, тогда синтаксис кода должен быть таким, как показано ниже.

 df.select("pid.Body.Vendor.RC.*", "pid.Body.Vendor.RD.*")
 

Комментарии:

1. Это сработало для меня, но не могли бы вы подсказать мне, как у меня может быть отдельное имя столбца для массива. Например, RD.Поставщик. Supplier_Data.Ссылка. Идентификатор в этом случае, если есть 2 элемента, то как разделить их на 2 разных столбца? -Спасибо 🙂

2. Допустим, идентификатор столбца массива, содержащий 2 элемента, вы хотите создать 2 разных столбца, тогда вы можете сделать это с помощью индекса массива. df.selectExpr("RD.Supplier.Supplier_Data.Reference.ID[0] as array1","RD.Supplier.Supplier_Data.Reference.ID[1] as array2") или df.select(col("RD.Supplier.Supplier_Data.Reference.ID").getItem(0).as("array1"))