Как извлечь ключ и значение в виде отдельных столбцов из столбца строки JSON

#apache-spark #pyspark #apache-spark-sql #pyspark-dataframes

#apache-spark #apache-spark-sql #pyspark

Вопрос:

У меня есть фрейм данных со столбцами col1 и col2 где col2 может содержаться строка JSON или обычная строка. Если он содержит анализируемую строку JSON, мне нужно извлечь ключи и значения для разделения столбцов в виде списков, иначе он должен возвращать пустой список в качестве третьей и четвертой строки.

Для достижения этой цели я использую pyspark. Здесь приветствуется любая помощь.

Исходный фрейм данных:

  ----- ---------------------------------------------- 
| col1|       col2                                   |
 ----- ---------------------------------------------- 
|a    |{"key1":"val1","key2":"val2"}                 |
|b    |{"key5":"val5", "key6":"val6", "key7":"val7"} |
|c    |"just a string"                               |
|d    | null                                         |
 ---------------------------------------------------- 
  

Требуемый фрейм данных:

  ----- ---------------- ---------------- 
| col1|   keys         |   values       |
 ----- ---------------- --------------- 
|a    |[key1,key2]     |[val1,val2]     |
|b    |[key5,key6,key7]|[val5,val6,val7]|
|c    |[]              |[]              |
|d    |[]              |[]              |
 ----- ---------------- ---------------- 
  

Комментарии:

1. Это очень специфический случай, и вам понадобится индивидуальное решение. Вы должны создать UDF, который принимает String и возвращает 2 списка. Он должен 1) подтвердить JSON 2) если недопустимо, вернуть два пустых списка 3) если допустимо, вернуть список ключей и значений (json может быть проанализирован как python dict с помощью json модуля, и вы можете сделать .keys() и .values() на нем)

2. Спасибо @ Samir Vyas за ваш ответ. Это было полезно

Ответ №1:

Старый вопрос, но мне не очень нравятся другие ответы, которые предлагают использовать UDF для этого.

Для Spark 2.2 вам следует использовать from_json функцию для преобразования строк json в тип map, затем использовать map_keys функцию для получения ключей и map_values функцию для получения значений:

 from pyspark.sql.functions import from_json, map_keys, map_values

df1 = df.withColumn('col2', from_json('col2', 'map<string,string>')) 
        .withColumn('keys', map_keys('col2')) 
        .withColumn('values', map_values('col2')) 
        .select('col1', 'keys', 'values') 

# ---- ------------------ ------------------ 
#|col1|keys              |values            |
# ---- ------------------ ------------------ 
#|a   |[key1, key2]      |[val1, val2]      |
#|b   |[key5, key6, key7]|[val5, val6, val7]|
#|c   |null              |null              |
#|d   |null              |null              |
# ---- ------------------ ------------------ 
  

Ответ №2:

Ключи в jsonpath — это $[*~] , значения — это $[*] . Но не похоже, что это поддерживается get_json_object .

Итак, нам нужны пользовательские функции:

 def json_keys(s):
     import json
     try:
         data = json.loads(s)
         return list(data.keys())
     except:
         return None
spark.udf.register("json_keys", json_keys)

def json_values(s):
     import json
     try:
         data = json.loads(s)
         return list(data.values())
     except:
         return None
spark.udf.register("json_values", json_values)

df.selectExpr("col1", "json_keys(col2) keys", "json_values(col2) values").collect()
  

Что дает:

  ---- ------------ ------------ 
|col1|        keys|      values|
 ---- ------------ ------------ 
|   a|[key1, key2]|[val1, val2]|
|   b|[key5, key6]|[val7, val6]|
|   c|        null|        null|
|   d|        null|        null|
 ---- ------------ ------------ 
  

Комментарии:

1. Большое спасибо @Mathieu Longtin за ваш ответ. Решение полезно, интересно, что я также пробовал аналогичный подход, но было любопытно, есть ли способ решить это без udf, и, судя по вашим комментариям, это может быть не так. Я принял ваш ответ, но у меня нет репутации, чтобы проголосовать за ответ

2. Я должен сказать, что мне больше нравится ответ @ blackbishop. Я не знал о from_json .

Ответ №3:

Вы можете использовать функцию pyspark explode из модуля sql:
Из документов:

pyspark.sql.functions.explode(col)[источник] Возвращает новую строку для каждого элемента в заданном массиве или карте. Использует имя столбца по умолчанию col для элементов в массиве и ключ и значение для элементов в карте, если не указано иное.

 from pyspark.sql import Row
eDF = spark.createDataFrame([Row(a=1, intlist=[1,2,3], mapfield={"a": "b"})])
eDF.select(explode(eDF.intlist).alias("anInt")).collect()
[Row(anInt=1), Row(anInt=2), Row(anInt=3)]

eDF.select(explode(eDF.mapfield).alias("key", "value")).show()
 --- ----- 
|key|value|
 --- ----- 
|  a|    b|
 --- -----