#apache-spark #struct #pyspark #apache-spark-sql #parquet
#apache-spark #структура #pyspark #apache-spark-sql #parquet
Вопрос:
У меня есть образец файла данных json, как показано ниже:
{"data_id":"1234","risk_characteristics":{"indicators":{"alcohol":true,"house":true,"business":true,"familyname":true,"swimming_pool":true}}}
{"data_id":"6789","risk_characteristics":{"indicators":{"alcohol":true,"house":true,"business":false,"familyname":true}}}
{"data_id":"5678","risk_characteristics":{"indicators":{"alcohol":false}}}
Я преобразовал файл json в parquet и загрузил в hive, используя приведенный ниже код
dataDF = spark.read.json("path/Datasmall.json")
dataDF.write.parquet("data.parquet")
parqFile = spark.read.parquet("data.parquet")
parqFile.write.saveAsTable("indicators_table", format='parquet', mode='append', path='/externalpath/indicators_table/')
from pyspark.sql import HiveContext
hive_context = HiveContext(sc)
fromHiveDF = hive_context.table("default.indicators_table")
fromHiveDF.show()
indicatorsDF = fromHiveDF.select('data_id', 'risk_characteristics.indicators')
indicatorsDF.printSchema()
root
|-- data_id: string (nullable = true)
|-- indicators: struct (nullable = true)
| |-- alcohol: boolean (nullable = true)
| |-- house: boolean (nullable = true)
| |-- business: boolean (nullable = true)
| |-- familyname: boolean (nullable = true)
indicatorsDF.show()
------- --------------------
|data_id| indicators|
------- --------------------
| 1234|[true, true, true...|
| 6789|[true, false, tru...|
| 5678| [false,,,,]|
------- --------------------
Вместо того, чтобы извлекать данные как select data_id, indicators.alcohol, indicators.house и т. Д,
Я просто хочу получить файл данных parquet, содержащий только 3 столбца. То есть — поля структуры преобразуются в строки под именем столбца indicators_type.
data_id indicators_type indicators_value
1234 alcohol T
1234 house T
1234 business T
1234 familyname T
1234 swimming_ppol T
6789 alcohol T
6789 house F
6789 business T
6789 familyname F
5678 alcohol F
Могу я спросить, как это сделать. Я пытаюсь выполнить это с помощью pyspark. Также есть ли способ добиться этого без жесткого кодирования буквальных деталей. В моих фактических данных данные структуры могут выходить за пределы familyname, и их может быть даже 100.
Большое спасибо
Комментарии:
1. опубликовать образец данных?
2. @Srinivas я добавил образец данных
Ответ №1:
Используется stack
для складывания столбцов:
df.show()
------- --------------------------
|data_id|indicators |
------- --------------------------
|1234 |[true, true, false, true] |
|6789 |[true, false, true, false]|
------- --------------------------
stack_expr = 'stack(' str(len(df.select('indicators.*').columns)) ', ' ', '.join(["'%s', indicators.%s" % (col,col) for col in df.select('indicators.*').columns]) ') as (indicators_type, indicators_value)'
df2 = df.selectExpr(
'data_id',
stack_expr
)
df2.show()
------- --------------- ----------------
|data_id|indicators_type|indicators_value|
------- --------------- ----------------
| 1234| alcohol| true|
| 1234| house| true|
| 1234| business| false|
| 1234| familyname| true|
| 6789| alcohol| true|
| 6789| house| false|
| 6789| business| true|
| 6789| familyname| false|
------- --------------- ----------------
Комментарии:
1. Есть ли способ добиться этого без жесткого кодирования буквальных деталей. В моих фактических данных данные структуры могут выходить за пределы familyname, и их может быть даже 100.
2. решение отлично работает с образцом входных данных. Однако мой базовый файл находится в формате parquet (я преобразовал исходные данные json в формат parquet). Когда я применяю тот же код к базовым данным parquet, я получаю следующую ошибку: вызвать исключение AnalysisException(s.split(‘: ‘, 1)[1], stackTrace) pyspark.sql.utils. Исключение AnalysisException: u»не удается разрешить ‘stack(39, ‘alcohol’,
indicators
.alcohol
, ………………..] паркетn»3. я обновил описание, добавив более подробную информацию о входных данных
4. решение действительно работает. я получал ошибку из-за несоответствия типов в одном из элементов структуры. Вместо того, чтобы все элементы struct были логическими, один из них был int . Есть ли способ установить indicators_value как строковый тип как часть команды stack?
5. работает отлично.. Большое спасибо @mck ! публикация обновленного выражения: stack_expr = ‘stack(‘ str(len(indDF.select(‘индикаторы.*’).столбцы)) ‘, ‘ ‘, ‘.join([«‘%s’, приведение (indicators.%s в виде строки)» % (col,col) для col в indDF.select(‘indicators.*’).columns]) ‘) как (indicators_type, indicators_value)’
Ответ №2:
Другое решение с использованием explode:
val df = spark.sql(""" with t1(
select 1234 data_id, named_struct('alcohol',true, 'house',false, 'business', true, 'familyname', false) as indicators
union
select 6789 data_id, named_struct('alcohol',true, 'house',false, 'business', true, 'familyname', false) as indicators
)
select * from t1
""")
df.show(false)
df.printSchema
------- --------------------------
|data_id|indicators |
------- --------------------------
|6789 |[true, false, true, false]|
|1234 |[true, false, true, false]|
------- --------------------------
root
|-- data_id: integer (nullable = false)
|-- indicators: struct (nullable = false)
| |-- alcohol: boolean (nullable = false)
| |-- house: boolean (nullable = false)
| |-- business: boolean (nullable = false)
| |-- familyname: boolean (nullable = false)
val df2 = df.withColumn("x", explode(array(
map(lit("alcohol") ,col("indicators.alcohol")),
map(lit("house"), col("indicators.house")),
map(lit("business"), col("indicators.business")),
map(lit("familyname"), col("indicators.familyname"))
)))
df2.select(col("data_id"),map_keys(col("x"))(0), map_values(col("x"))(0)).show
------- -------------- ----------------
|data_id|map_keys(x)[0]|map_values(x)[0]|
------- -------------- ----------------
| 6789| alcohol| true|
| 6789| house| false|
| 6789| business| true|
| 6789| familyname| false|
| 1234| alcohol| true|
| 1234| house| false|
| 1234| business| true|
| 1234| familyname| false|
------- -------------- ----------------
Обновление-1:
Чтобы индикаторы динамически структурировали столбцы, используйте приведенный ниже подход.
val colsx = df.select("indicators.*").columns
colsx: Array[String] = Array(alcohol, house, business, familyname)
val exp1 = colsx.map( x => s""" map("${x}", indicators.${x}) """ ).mkString(",")
val exp2 = " explode(array( " exp1 " )) "
val df2 = df.withColumn("x",expr(exp2))
df2.select(col("data_id"),map_keys(col("x"))(0).as("indicator_key"), map_values(col("x"))(0).as("indicator_value")).show
------- ------------- ---------------
|data_id|indicator_key|indicator_value|
------- ------------- ---------------
| 6789| alcohol| true|
| 6789| house| false|
| 6789| business| true|
| 6789| familyname| false|
| 1234| alcohol| true|
| 1234| house| false|
| 1234| business| true|
| 1234| familyname| false|
------- ------------- ---------------
Комментарии:
1. Это помогает, но есть ли способ добиться этого без жесткого кодирования буквальных деталей. В моих фактических данных данные структуры могут выходить за пределы familyname, и их может быть даже 100. Я добавлю этот пункт также в основной раздел
2. можете ли вы проверить update1 в ответе.