Преобразование имен столбцов структуры в строки в файле parquet

#apache-spark #struct #pyspark #apache-spark-sql #parquet

#apache-spark #структура #pyspark #apache-spark-sql #parquet

Вопрос:

У меня есть образец файла данных json, как показано ниже:

 {"data_id":"1234","risk_characteristics":{"indicators":{"alcohol":true,"house":true,"business":true,"familyname":true,"swimming_pool":true}}}
{"data_id":"6789","risk_characteristics":{"indicators":{"alcohol":true,"house":true,"business":false,"familyname":true}}}
{"data_id":"5678","risk_characteristics":{"indicators":{"alcohol":false}}}

 

Я преобразовал файл json в parquet и загрузил в hive, используя приведенный ниже код

 dataDF = spark.read.json("path/Datasmall.json")
dataDF.write.parquet("data.parquet")
parqFile = spark.read.parquet("data.parquet")
parqFile.write.saveAsTable("indicators_table", format='parquet', mode='append', path='/externalpath/indicators_table/')

from pyspark.sql import HiveContext
hive_context = HiveContext(sc)
fromHiveDF = hive_context.table("default.indicators_table")
fromHiveDF.show()

indicatorsDF = fromHiveDF.select('data_id', 'risk_characteristics.indicators')
indicatorsDF.printSchema()

root
 |-- data_id: string (nullable = true)
 |-- indicators: struct (nullable = true)
 |    |-- alcohol: boolean (nullable = true)
 |    |-- house: boolean (nullable = true)
 |    |-- business: boolean (nullable = true)
 |    |-- familyname: boolean (nullable = true)

indicatorsDF.show()
 ------- -------------------- 
|data_id|          indicators|
 ------- -------------------- 
|   1234|[true, true, true...|
|   6789|[true, false, tru...|
|   5678|         [false,,,,]|
 ------- -------------------- 
 

Вместо того, чтобы извлекать данные как select data_id, indicators.alcohol, indicators.house и т. Д,
Я просто хочу получить файл данных parquet, содержащий только 3 столбца. То есть — поля структуры преобразуются в строки под именем столбца indicators_type.

 data_id      indicators_type     indicators_value
1234         alcohol             T
1234         house               T
1234         business            T
1234         familyname          T
1234         swimming_ppol       T
6789         alcohol             T
6789         house               F
6789         business            T
6789         familyname          F
5678         alcohol             F
 

Могу я спросить, как это сделать. Я пытаюсь выполнить это с помощью pyspark. Также есть ли способ добиться этого без жесткого кодирования буквальных деталей. В моих фактических данных данные структуры могут выходить за пределы familyname, и их может быть даже 100.

Большое спасибо

Комментарии:

1. опубликовать образец данных?

2. @Srinivas я добавил образец данных

Ответ №1:

Используется stack для складывания столбцов:

 df.show()
 ------- -------------------------- 
|data_id|indicators                |
 ------- -------------------------- 
|1234   |[true, true, false, true] |
|6789   |[true, false, true, false]|
 ------- -------------------------- 

stack_expr = 'stack('   str(len(df.select('indicators.*').columns))   ', '   ', '.join(["'%s', indicators.%s" % (col,col) for col in df.select('indicators.*').columns])   ') as (indicators_type, indicators_value)'

df2 = df.selectExpr(
    'data_id',
    stack_expr
)

df2.show()
 ------- --------------- ---------------- 
|data_id|indicators_type|indicators_value|
 ------- --------------- ---------------- 
|   1234|        alcohol|            true|
|   1234|          house|            true|
|   1234|       business|           false|
|   1234|     familyname|            true|
|   6789|        alcohol|            true|
|   6789|          house|           false|
|   6789|       business|            true|
|   6789|     familyname|           false|
 ------- --------------- ---------------- 
 

Комментарии:

1. Есть ли способ добиться этого без жесткого кодирования буквальных деталей. В моих фактических данных данные структуры могут выходить за пределы familyname, и их может быть даже 100.

2. решение отлично работает с образцом входных данных. Однако мой базовый файл находится в формате parquet (я преобразовал исходные данные json в формат parquet). Когда я применяю тот же код к базовым данным parquet, я получаю следующую ошибку: вызвать исключение AnalysisException(s.split(‘: ‘, 1)[1], stackTrace) pyspark.sql.utils. Исключение AnalysisException: u»не удается разрешить ‘stack(39, ‘alcohol’, indicators . alcohol , ………………..] паркетn»

3. я обновил описание, добавив более подробную информацию о входных данных

4. решение действительно работает. я получал ошибку из-за несоответствия типов в одном из элементов структуры. Вместо того, чтобы все элементы struct были логическими, один из них был int . Есть ли способ установить indicators_value как строковый тип как часть команды stack?

5. работает отлично.. Большое спасибо @mck ! публикация обновленного выражения: stack_expr = ‘stack(‘ str(len(indDF.select(‘индикаторы.*’).столбцы)) ‘, ‘ ‘, ‘.join([«‘%s’, приведение (indicators.%s в виде строки)» % (col,col) для col в indDF.select(‘indicators.*’).columns]) ‘) как (indicators_type, indicators_value)’

Ответ №2:

Другое решение с использованием explode:

 val df = spark.sql(""" with t1( 
select 1234 data_id, named_struct('alcohol',true, 'house',false, 'business', true, 'familyname', false) as indicators
union
select 6789 data_id, named_struct('alcohol',true, 'house',false, 'business', true, 'familyname', false) as indicators
) 
select * from t1
""")

df.show(false)
df.printSchema

 ------- -------------------------- 
|data_id|indicators                |
 ------- -------------------------- 
|6789   |[true, false, true, false]|
|1234   |[true, false, true, false]|
 ------- -------------------------- 

root
 |-- data_id: integer (nullable = false)
 |-- indicators: struct (nullable = false)
 |    |-- alcohol: boolean (nullable = false)
 |    |-- house: boolean (nullable = false)
 |    |-- business: boolean (nullable = false)
 |    |-- familyname: boolean (nullable = false)


val df2 = df.withColumn("x", explode(array( 
          map(lit("alcohol") ,col("indicators.alcohol")),
          map(lit("house"), col("indicators.house")),
          map(lit("business"), col("indicators.business")),
          map(lit("familyname"), col("indicators.familyname"))
                          )))

df2.select(col("data_id"),map_keys(col("x"))(0), map_values(col("x"))(0)).show

 ------- -------------- ---------------- 
|data_id|map_keys(x)[0]|map_values(x)[0]|
 ------- -------------- ---------------- 
|   6789|       alcohol|            true|
|   6789|         house|           false|
|   6789|      business|            true|
|   6789|    familyname|           false|
|   1234|       alcohol|            true|
|   1234|         house|           false|
|   1234|      business|            true|
|   1234|    familyname|           false|
 ------- -------------- ---------------- 
 

Обновление-1:

Чтобы индикаторы динамически структурировали столбцы, используйте приведенный ниже подход.

 val colsx = df.select("indicators.*").columns

colsx: Array[String] = Array(alcohol, house, business, familyname)

val exp1 = colsx.map( x => s""" map("${x}", indicators.${x})  """ ).mkString(",")

val exp2 = " explode(array( "   exp1   " )) "

val df2 = df.withColumn("x",expr(exp2))

df2.select(col("data_id"),map_keys(col("x"))(0).as("indicator_key"), map_values(col("x"))(0).as("indicator_value")).show

 ------- ------------- --------------- 
|data_id|indicator_key|indicator_value|
 ------- ------------- --------------- 
|   6789|      alcohol|           true|
|   6789|        house|          false|
|   6789|     business|           true|
|   6789|   familyname|          false|
|   1234|      alcohol|           true|
|   1234|        house|          false|
|   1234|     business|           true|
|   1234|   familyname|          false|
 ------- ------------- --------------- 
 

Комментарии:

1. Это помогает, но есть ли способ добиться этого без жесткого кодирования буквальных деталей. В моих фактических данных данные структуры могут выходить за пределы familyname, и их может быть даже 100. Я добавлю этот пункт также в основной раздел

2. можете ли вы проверить update1 в ответе.