#python #json #dataframe #apache-spark #pyspark
#python #json #фрейм данных #apache-spark #pyspark
Вопрос:
У меня есть фрейм данных в PySpark с 3 столбцами — json, date и object_id:
-----------------------------------------------------------------------------------------
|json |date |object_id|
-----------------------------------------------------------------------------------------
|{'a':{'b':0,'c':{'50':0.005,'60':0,'100':0},'d':0.01,'e':0,'f':2}}|2020-08-01|xyz123 |
|{'a':{'m':0,'n':{'50':0.005,'60':0,'100':0},'d':0.01,'e':0,'f':2}}|2020-08-02|xyz123 |
|{'g':{'h':0,'j':{'50':0.005,'80':0,'100':0},'d':0.02}} |2020-08-03|xyz123 |
-----------------------------------------------------------------------------------------
Теперь у меня есть список переменных: [a.c.60, a.n.60, a.d, g.h]. Мне нужно извлечь только эти переменные из столбца json вышеупомянутого фрейма данных и добавить эти переменные в виде столбцов в фрейм данных с их соответствующими значениями.
Итак, в итоге фрейм данных должен выглядеть следующим образом:
-------------------------------------------------------------------------------------------------------
|json |date |object_id|a.c.60|a.n.60|a.d |g.h|
-------------------------------------------------------------------------------------------------------
|{'a':{'b':0,'c':{'50':0.005,'60':0,'100':0},'d':0.01,...|2020-08-01|xyz123 |0 |null |0.01|null|
|{'a':{'m':0,'n':{'50':0.005,'60':0,'100':0},'d':0.01,...|2020-08-02|xyz123 |null |0 |0.01|null|
|{'g':{'h':0,'j':{'k':0.005,'':0,'100':0},'d':0.01}} |2020-08-03|xyz123 |null |null |0.02|0 |
-------------------------------------------------------------------------------------------------------
Пожалуйста, помогите получить этот результирующий фрейм данных. Основная проблема, с которой я сталкиваюсь, связана с отсутствием фиксированной структуры для входящих данных json. Данные json могут быть любыми во вложенной форме, но мне нужно извлечь только указанные четыре переменные. Я достиг этого в Pandas, сгладив строку json, а затем извлекая 4 переменные, но в Spark это становится затруднительным.
Ответ №1:
Есть 2 способа сделать это:
- используйте
get_json_object
функцию, например, так:
import pyspark.sql.functions as F
df = spark.createDataFrame(['{"a":{"b":0,"c":{"50":0.005,"60":0,"100":0},"d":0.01,"e":0,"f":2}}',
'{"a":{"m":0,"n":{"50":0.005,"60":0,"100":0},"d":0.01,"e":0,"f":2}}',
'{"g":{"h":0,"j":{"50":0.005,"80":0,"100":0},"d":0.02}}'],
StringType())
df3 = df.select(F.get_json_object(F.col("value"), "$.a.c.60").alias("a_c_60"),
F.get_json_object(F.col("value"), "$.a.n.60").alias("a_n_60"),
F.get_json_object(F.col("value"), "$.a.d").alias("a_d"),
F.get_json_object(F.col("value"), "$.g.h").alias("g_h"))
даст:
>>> df3.show()
------ ------ ---- ----
|a_c_60|a_n_60| a_d| g_h|
------ ------ ---- ----
| 0| null|0.01|null|
| null| 0|0.01|null|
| null| null|null| 0|
------ ------ ---- ----
- Объявляйте схему явно (только необходимые поля), преобразуйте JSON в structus с помощью
from_json
функции со схемой, а затем извлекайте отдельные значения из структур — это может быть более производительным, чем JSON Path:
from pyspark.sql.types import *
import pyspark.sql.functions as F
aSchema = StructType([
StructField("c", StructType([
StructField("60", DoubleType(), True)
]), True),
StructField("n", StructType([
StructField("60", DoubleType(), True)
]), True),
StructField("d", DoubleType(), True),
])
gSchema = StructType([
StructField("h", DoubleType(), True)
])
schema = StructType([
StructField("a", aSchema, True),
StructField("g", gSchema, True)
])
df = spark.createDataFrame(['{"a":{"b":0,"c":{"50":0.005,"60":0,"100":0},"d":0.01,"e":0,"f":2}}',
'{"a":{"m":0,"n":{"50":0.005,"60":0,"100":0},"d":0.01,"e":0,"f":2}}',
'{"g":{"h":0,"j":{"50":0.005,"80":0,"100":0},"d":0.02}}'],
StringType())
df2 = df.select(F.from_json("value", schema=schema).alias('data')).select('data.*')
df2.select(df2.a.c['60'], df2.a.n['60'], df2.a.d, df2.g.h).show()
даст
------ ------ ---- ----
|a.c.60|a.n.60| a.d| g.h|
------ ------ ---- ----
| 0.0| null|0.01|null|
| null| 0.0|0.01|null|
| null| null|null| 0.0|
------ ------ ---- ----
Комментарии:
1. Спасибо Алексу Отту, первый метод у меня сработал. Однако я не пробовал второй вариант. Большое спасибо за вашу помощь 🙂
2. Привет @Alex Ott, каким будет код, если фрейм данных будет иметь вид (список словарей): df = spark.createDataFrame([‘[{«a»:{«b»:0,»c»:{«50″:0.005,»60″:0,»100″:0},» d»:0.01,»e»:0,»f»:2}}]’, ‘[{» a»:{«m»:0,»n»:{«50″:0.005,»60″:0,»100″:0},» d»:0.01,»e»:0,»f»:2}}]’, ‘[{» g»:{«h»:0,»j»:{«50″:0.005,»80″:0,»100″:0},» d»:0.02}}]’], StringType())