Разнесите несколько столбцов, сохранив имя столбца в PySpark

#python #apache-spark #pyspark #apache-spark-sql

Вопрос:

У меня есть следующий фрейм данных PySpark ( first_df ):

ID кошка собака птица
0 [«персан», «сфинкс»] [] [«strisores»]
1 [] [«бульдог»] [«колумбавы», «груиформы»]
2 [«тряпичная кукла»] [«лабрадор»] []

И я хотел бы разбить сразу несколько столбцов, сохранив старые имена столбцов в новом столбце, например:

ID животное animal_type
0 persan кошка
0 сфинкс кошка
0 стризоры птица
1 бульдог собака
1 колумбавес птица
1 груиформы птица
2 тряпичная кукла кошка
2 лабрадор собака

До сих пор мое текущее решение заключается в следующем:

 animal_types = ['cat', 'dog', 'bird']
df = spark.createDataFrame([], schema=StructType([
    StructField('id', StringType()),
    StructField('animal', StringType()),
    StructField('animal_type', StringType())
]))

for animal_type in animal_types:
  df = first_df 
    .select('id', animal_type) 
    .withColumn('animal', F.explode(animal_type)) 
    .drop(animal_type) 
    .withColumn('animal_type', F.lit(animal_type.upper())) 
    .union(df)
 

Но я нашел это довольно неэффективным, особенно при работе в кластерах.

Есть ли лучший способ искры для достижения этой цели?

Ответ №1:

Вы можете открепить и разорвать массив:

 df2 = df.selectExpr(
    'id', 
    'stack('   str(len(df.columns[1:]))   ', '   ', '.join(["%s, '%s'" % (col,col) for col in df.columns[1:]])   ') as (animal, animal_type)'
).withColumn(
    'animal', 
    F.explode('animal')
)

df2.show()
 --- ---------- ----------- 
| id|    animal|animal_type|
 --- ---------- ----------- 
|  0| strisores|       bird|
|  0|    persan|        cat|
|  0|    sphynx|        cat|
|  1|columbaves|       bird|
|  1|gruiformes|       bird|
|  1|   bulldog|        dog|
|  2|   ragdoll|        cat|
|  2|  labrador|        dog|
 --- ---------- -----------