#apache-spark #pyspark #apache-spark-sql #pyspark-dataframes
#apache-spark #apache-spark-sql #pyspark
Вопрос:
У меня есть фрейм данных со столбцами col1
и col2
где col2
может содержаться строка JSON или обычная строка. Если он содержит анализируемую строку JSON, мне нужно извлечь ключи и значения для разделения столбцов в виде списков, иначе он должен возвращать пустой список в качестве третьей и четвертой строки.
Для достижения этой цели я использую pyspark. Здесь приветствуется любая помощь.
Исходный фрейм данных:
----- ----------------------------------------------
| col1| col2 |
----- ----------------------------------------------
|a |{"key1":"val1","key2":"val2"} |
|b |{"key5":"val5", "key6":"val6", "key7":"val7"} |
|c |"just a string" |
|d | null |
----------------------------------------------------
Требуемый фрейм данных:
----- ---------------- ----------------
| col1| keys | values |
----- ---------------- ---------------
|a |[key1,key2] |[val1,val2] |
|b |[key5,key6,key7]|[val5,val6,val7]|
|c |[] |[] |
|d |[] |[] |
----- ---------------- ----------------
Комментарии:
1. Это очень специфический случай, и вам понадобится индивидуальное решение. Вы должны создать UDF, который принимает String и возвращает 2 списка. Он должен 1) подтвердить JSON 2) если недопустимо, вернуть два пустых списка 3) если допустимо, вернуть список ключей и значений (json может быть проанализирован как python
dict
с помощьюjson
модуля, и вы можете сделать.keys()
и.values()
на нем)2. Спасибо @ Samir Vyas за ваш ответ. Это было полезно
Ответ №1:
Старый вопрос, но мне не очень нравятся другие ответы, которые предлагают использовать UDF для этого.
Для Spark 2.2 вам следует использовать from_json
функцию для преобразования строк json в тип map, затем использовать map_keys
функцию для получения ключей и map_values
функцию для получения значений:
from pyspark.sql.functions import from_json, map_keys, map_values
df1 = df.withColumn('col2', from_json('col2', 'map<string,string>'))
.withColumn('keys', map_keys('col2'))
.withColumn('values', map_values('col2'))
.select('col1', 'keys', 'values')
# ---- ------------------ ------------------
#|col1|keys |values |
# ---- ------------------ ------------------
#|a |[key1, key2] |[val1, val2] |
#|b |[key5, key6, key7]|[val5, val6, val7]|
#|c |null |null |
#|d |null |null |
# ---- ------------------ ------------------
Ответ №2:
Ключи в jsonpath — это $[*~]
, значения — это $[*]
. Но не похоже, что это поддерживается get_json_object
.
Итак, нам нужны пользовательские функции:
def json_keys(s):
import json
try:
data = json.loads(s)
return list(data.keys())
except:
return None
spark.udf.register("json_keys", json_keys)
def json_values(s):
import json
try:
data = json.loads(s)
return list(data.values())
except:
return None
spark.udf.register("json_values", json_values)
df.selectExpr("col1", "json_keys(col2) keys", "json_values(col2) values").collect()
Что дает:
---- ------------ ------------
|col1| keys| values|
---- ------------ ------------
| a|[key1, key2]|[val1, val2]|
| b|[key5, key6]|[val7, val6]|
| c| null| null|
| d| null| null|
---- ------------ ------------
Комментарии:
1. Большое спасибо @Mathieu Longtin за ваш ответ. Решение полезно, интересно, что я также пробовал аналогичный подход, но было любопытно, есть ли способ решить это без udf, и, судя по вашим комментариям, это может быть не так. Я принял ваш ответ, но у меня нет репутации, чтобы проголосовать за ответ
2. Я должен сказать, что мне больше нравится ответ @ blackbishop. Я не знал о
from_json
.
Ответ №3:
Вы можете использовать функцию pyspark explode из модуля sql:
Из документов:
pyspark.sql.functions.explode(col)[источник] Возвращает новую строку для каждого элемента в заданном массиве или карте. Использует имя столбца по умолчанию col для элементов в массиве и ключ и значение для элементов в карте, если не указано иное.
from pyspark.sql import Row
eDF = spark.createDataFrame([Row(a=1, intlist=[1,2,3], mapfield={"a": "b"})])
eDF.select(explode(eDF.intlist).alias("anInt")).collect()
[Row(anInt=1), Row(anInt=2), Row(anInt=3)]
eDF.select(explode(eDF.mapfield).alias("key", "value")).show()
--- -----
|key|value|
--- -----
| a| b|
--- -----