Сгладить ключ Json, значения в Pyspark

#pyspark #apache-spark-sql

Вопрос:

В таблице, содержащей 2 столбца и 2 записи :

Запись 1 : Столбец 1 — значение my_col как: {"XXX": ["123","456"],"YYY": ["246","135"]} и столбец 2 — идентификатор как A123

Запись 2 : Столбец 1 — значение my_col как: {"XXX": ["123","456"],"YYY": ["246","135"], "ZZZ":["333","444"]} и столбец 2 — идентификатор как B222

Нужно разобрать/сгладить с помощью pyspark

Ожидание :

Клавиша Ценность ID
ХХХ 123 A123
ХХХ 456 A123
УУУ 246 A123
УУУ 135 A123
ZZZ 333 В222
ZZZ 444 В222

Комментарии:

1. Ты что — нибудь пробовал?

Ответ №1:

Если ваш столбец является строкой, вы можете использовать from_json и custom_schema преобразовать его в a MapType перед использованием explode , чтобы извлечь его в желаемые результаты. Я предположил, что ваш начальный столбец был назван my_col и что ваши данные находились в именованном input_df фрейме данных .

Пример показан ниже

Подход 1: Использование api pyspark

 from pyspark.sql import functions as F
from pyspark.sql import types as T

custom_schema = T.MapType(T.StringType(),T.ArrayType(T.StringType()))

output_df = (
    input_df.select(
        F.from_json(F.col('my_col'),custom_schema).alias('my_col_json')
    )
    .select(F.explode('my_col_json'))
    .select(
        F.col('key'),
        F.explode('value')
    )
)
 

Подход 2: Использование spark sql

 # Step 1 : Create a temporary view that may be queried
input_df.createOrReplaceTempView("input_df")
# Step 2: Run the following sql on your spark session
output_df = sparkSession.sql("""
SELECT
    key,
    EXPLODE(value)
FROM (
    SELECT
        EXPLODE(from_json(my_col,"MAP<STRING,ARRAY<STRING>>"))
    FROM
        input_df
) t
""")
 

Для столбца json

Если уже json

 from pyspark.sql import functions as F

output_df = (
    input_df.select(F.explode('my_col_json'))
    .select(
        F.col('key'),
        F.explode('value')
    )
)
 

или

 # Step 1 : Create a temporary view that may be queried
input_df.createOrReplaceTempView("input_df")
# Step 2: Run the following sql on your spark session
output_df = sparkSession.sql("""
SELECT
    key,
    EXPLODE(value)
FROM (
    SELECT
        EXPLODE(my_col)
    FROM
        input_df
) t
""")
 

Дайте мне знать, если это сработает для вас.

Комментарии:

1. Спасибо, что поделились множеством вариантов, я пытаюсь это сделать.

2. Как я могу реализовать этот сценарий с помощью запроса Hive QL?

3. Вы можете попробовать SQL, описанный в spark-sql приведенных здесь примерах, и сообщить мне, работают ли они для вас.

4. когда я пытаюсь подойти 2 : Ошибка синтаксиса: недопустимый синтаксис , указывающий на КАРТУ

5. при попытке найти столбец json — — — pyspark.sql.utils. Исключение AnalysisException: u»не может разрешить «взрыв(имя таблицы. json_column ) в связи с несоответствие типов данных: Ввод, чтобы функция explode должен быть массив или типа карты, не Строкавведите;;N в игровой форме [взорваться(json_col#3) список()]Н — SubqueryAlias имя_таблицыН — HiveTableRelation DB_name . table_name , орг.Апачи.в Hadoop.улей.serde2.OpenCSVSerde