PySpark: считывает вложенный JSON из столбца строкового типа и создает столбцы

#python #json #dataframe #apache-spark #pyspark

#python #json #фрейм данных #apache-spark #pyspark

Вопрос:

У меня есть фрейм данных в PySpark с 3 столбцами — json, date и object_id:

 -----------------------------------------------------------------------------------------
|json                                                              |date      |object_id|
-----------------------------------------------------------------------------------------
|{'a':{'b':0,'c':{'50':0.005,'60':0,'100':0},'d':0.01,'e':0,'f':2}}|2020-08-01|xyz123   |
|{'a':{'m':0,'n':{'50':0.005,'60':0,'100':0},'d':0.01,'e':0,'f':2}}|2020-08-02|xyz123   |
|{'g':{'h':0,'j':{'50':0.005,'80':0,'100':0},'d':0.02}}            |2020-08-03|xyz123   |
-----------------------------------------------------------------------------------------
  

Теперь у меня есть список переменных: [a.c.60, a.n.60, a.d, g.h]. Мне нужно извлечь только эти переменные из столбца json вышеупомянутого фрейма данных и добавить эти переменные в виде столбцов в фрейм данных с их соответствующими значениями.

Итак, в итоге фрейм данных должен выглядеть следующим образом:

 -------------------------------------------------------------------------------------------------------
|json                                                    |date      |object_id|a.c.60|a.n.60|a.d |g.h|
-------------------------------------------------------------------------------------------------------
|{'a':{'b':0,'c':{'50':0.005,'60':0,'100':0},'d':0.01,...|2020-08-01|xyz123   |0     |null  |0.01|null|
|{'a':{'m':0,'n':{'50':0.005,'60':0,'100':0},'d':0.01,...|2020-08-02|xyz123   |null  |0     |0.01|null|
|{'g':{'h':0,'j':{'k':0.005,'':0,'100':0},'d':0.01}}     |2020-08-03|xyz123   |null  |null  |0.02|0   |
-------------------------------------------------------------------------------------------------------
  

Пожалуйста, помогите получить этот результирующий фрейм данных. Основная проблема, с которой я сталкиваюсь, связана с отсутствием фиксированной структуры для входящих данных json. Данные json могут быть любыми во вложенной форме, но мне нужно извлечь только указанные четыре переменные. Я достиг этого в Pandas, сгладив строку json, а затем извлекая 4 переменные, но в Spark это становится затруднительным.

Ответ №1:

Есть 2 способа сделать это:

  1. используйте get_json_object функцию, например, так:
 import pyspark.sql.functions as F

df = spark.createDataFrame(['{"a":{"b":0,"c":{"50":0.005,"60":0,"100":0},"d":0.01,"e":0,"f":2}}',
                            '{"a":{"m":0,"n":{"50":0.005,"60":0,"100":0},"d":0.01,"e":0,"f":2}}',
                            '{"g":{"h":0,"j":{"50":0.005,"80":0,"100":0},"d":0.02}}'],
                           StringType())

df3 = df.select(F.get_json_object(F.col("value"), "$.a.c.60").alias("a_c_60"),
                F.get_json_object(F.col("value"), "$.a.n.60").alias("a_n_60"),
                F.get_json_object(F.col("value"), "$.a.d").alias("a_d"),
                F.get_json_object(F.col("value"), "$.g.h").alias("g_h"))
  

даст:

 >>> df3.show()
 ------ ------ ---- ---- 
|a_c_60|a_n_60| a_d| g_h|
 ------ ------ ---- ---- 
|     0|  null|0.01|null|
|  null|     0|0.01|null|
|  null|  null|null|   0|
 ------ ------ ---- ---- 
  
  1. Объявляйте схему явно (только необходимые поля), преобразуйте JSON в structus с помощью from_json функции со схемой, а затем извлекайте отдельные значения из структур — это может быть более производительным, чем JSON Path:
 from pyspark.sql.types import *
import pyspark.sql.functions as F

aSchema = StructType([
    StructField("c", StructType([
        StructField("60", DoubleType(), True)
    ]), True),
    StructField("n", StructType([
        StructField("60", DoubleType(), True)
    ]), True),
    StructField("d", DoubleType(), True),
])
gSchema = StructType([
    StructField("h", DoubleType(), True)
])

schema = StructType([
    StructField("a", aSchema, True),
    StructField("g", gSchema, True)
])

df = spark.createDataFrame(['{"a":{"b":0,"c":{"50":0.005,"60":0,"100":0},"d":0.01,"e":0,"f":2}}',
                            '{"a":{"m":0,"n":{"50":0.005,"60":0,"100":0},"d":0.01,"e":0,"f":2}}',
                            '{"g":{"h":0,"j":{"50":0.005,"80":0,"100":0},"d":0.02}}'],
                           StringType())

df2 = df.select(F.from_json("value", schema=schema).alias('data')).select('data.*')
df2.select(df2.a.c['60'], df2.a.n['60'], df2.a.d, df2.g.h).show()
  

даст

  ------ ------ ---- ---- 
|a.c.60|a.n.60| a.d| g.h|
 ------ ------ ---- ---- 
|   0.0|  null|0.01|null|
|  null|   0.0|0.01|null|
|  null|  null|null| 0.0|
 ------ ------ ---- ---- 
  

Комментарии:

1. Спасибо Алексу Отту, первый метод у меня сработал. Однако я не пробовал второй вариант. Большое спасибо за вашу помощь 🙂

2. Привет @Alex Ott, каким будет код, если фрейм данных будет иметь вид (список словарей): df = spark.createDataFrame([‘[{«a»:{«b»:0,»c»:{«50″:0.005,»60″:0,»100″:0},» d»:0.01,»e»:0,»f»:2}}]’, ‘[{» a»:{«m»:0,»n»:{«50″:0.005,»60″:0,»100″:0},» d»:0.01,»e»:0,»f»:2}}]’, ‘[{» g»:{«h»:0,»j»:{«50″:0.005,»80″:0,»100″:0},» d»:0.02}}]’], StringType())