#pyspark #apache-spark-sql
Вопрос:
В таблице, содержащей 2 столбца и 2 записи :
Запись 1 : Столбец 1 — значение my_col как: {"XXX": ["123","456"],"YYY": ["246","135"]}
и столбец 2 — идентификатор как A123
Запись 2 : Столбец 1 — значение my_col как: {"XXX": ["123","456"],"YYY": ["246","135"], "ZZZ":["333","444"]}
и столбец 2 — идентификатор как B222
Нужно разобрать/сгладить с помощью pyspark
Ожидание :
Клавиша | Ценность | ID |
---|---|---|
ХХХ | 123 | A123 |
ХХХ | 456 | A123 |
УУУ | 246 | A123 |
УУУ | 135 | A123 |
ZZZ | 333 | В222 |
ZZZ | 444 | В222 |
Комментарии:
1. Ты что — нибудь пробовал?
Ответ №1:
Если ваш столбец является строкой, вы можете использовать from_json
и custom_schema
преобразовать его в a MapType
перед использованием explode
, чтобы извлечь его в желаемые результаты. Я предположил, что ваш начальный столбец был назван my_col
и что ваши данные находились в именованном input_df
фрейме данных .
Пример показан ниже
Подход 1: Использование api pyspark
from pyspark.sql import functions as F
from pyspark.sql import types as T
custom_schema = T.MapType(T.StringType(),T.ArrayType(T.StringType()))
output_df = (
input_df.select(
F.from_json(F.col('my_col'),custom_schema).alias('my_col_json')
)
.select(F.explode('my_col_json'))
.select(
F.col('key'),
F.explode('value')
)
)
Подход 2: Использование spark sql
# Step 1 : Create a temporary view that may be queried
input_df.createOrReplaceTempView("input_df")
# Step 2: Run the following sql on your spark session
output_df = sparkSession.sql("""
SELECT
key,
EXPLODE(value)
FROM (
SELECT
EXPLODE(from_json(my_col,"MAP<STRING,ARRAY<STRING>>"))
FROM
input_df
) t
""")
Для столбца json
Если уже json
from pyspark.sql import functions as F
output_df = (
input_df.select(F.explode('my_col_json'))
.select(
F.col('key'),
F.explode('value')
)
)
или
# Step 1 : Create a temporary view that may be queried
input_df.createOrReplaceTempView("input_df")
# Step 2: Run the following sql on your spark session
output_df = sparkSession.sql("""
SELECT
key,
EXPLODE(value)
FROM (
SELECT
EXPLODE(my_col)
FROM
input_df
) t
""")
Дайте мне знать, если это сработает для вас.
Комментарии:
1. Спасибо, что поделились множеством вариантов, я пытаюсь это сделать.
2. Как я могу реализовать этот сценарий с помощью запроса Hive QL?
3. Вы можете попробовать SQL, описанный в
spark-sql
приведенных здесь примерах, и сообщить мне, работают ли они для вас.4. когда я пытаюсь подойти 2 : Ошибка синтаксиса: недопустимый синтаксис , указывающий на КАРТУ
5. при попытке найти столбец json — — — pyspark.sql.utils. Исключение AnalysisException: u»не может разрешить «взрыв(имя таблицы.
json_column
) в связи с несоответствие типов данных: Ввод, чтобы функция explode должен быть массив или типа карты, не Строкавведите;;N в игровой форме [взорваться(json_col#3) список()]Н — SubqueryAlias имя_таблицыН — HiveTableRelationDB_name
.table_name
, орг.Апачи.в Hadoop.улей.serde2.OpenCSVSerde