Создайте группу фреймов данных с помощью collect_list и заполните недостающие значения 0

#apache-spark #pyspark

Вопрос:

У меня есть фрейм данных такого рода

  ------ -------- -------- 
|    id|category|quantity|
 ------ -------- -------- 
|merch1|   fruit|    20.0|
|merch1| veggies|   300.0|
|merch1|   diary|    10.0|
|merch1|organics|    12.0|
|merch1|  frozen|    11.0|
|merch2|   fruit|     6.0|
|merch2|   diary|     6.0|
|merch2|  frozen|     8.0|
|merch3| veggies|    13.0|
|merch3|organics|     4.0|
|merch3|  frozen|    10.0|
|merch4|   fruit|    28.0|
|merch4|organics|    11.0|
 ------ -------- -------- 
 

Я хотел бы сгруппировать идентификатор и сгенерировать вектор количества в виде упорядоченного списка коллекций, чтобы, если категория отсутствует, она заполнялась 0.0. Различными категориями являются

  --------                                                                       
|category|
 -------- 
|  frozen|
|   diary|
| veggies|
|organics|
|   fruit|
 -------- 
 

Например, для merch1, так как все присутствует, вектор
для merch1 это [11.0, 10.0, 300.0, 12.0, 20.0]
для merch2 это [8.0, 6.0, 0.0, 0.0, 6.0]
для merch3 это [10.0, 0.0, 13.0, 4.0, 0.0]
для merch4 это [0.0, 0.0, 0.0, 28.0, 11.0]

Следовательно, окончательный кадр данных, который я ищу, это

  ------ -------- ---------------------- 
|    id| vector                        |
 ------ -------- ---------------------- 
|merch1|[11.0, 10.0, 300.0, 12.0, 20.0]|
|merch2|[8.0, 6.0, 0.0, 0.0, 6.0]      |
|merch3|[10.0, 0.0, 13.0, 4.0, 0.0]    |
|merch4|[0.0, 0.0, 0.0, 28.0, 11.0]    |
 ------ ------------------------------- 
 

Ответ №1:

Мы можем достичь этого в два этапа: преобразовать строки в столбцы на уровне группы(здесь id ) с помощью pivot array функции SQL и использовать ее для создания списка, как и ожидалось.

 from pyspark.sql import function as f
df.show()
 ------ -------- -------- 
|    id|category|quantity|
 ------ -------- -------- 
|merch1|   fruit|    20.0|
|merch1| veggies|   300.0|
|merch1|   diary|    10.0|
|merch1|organics|    12.0|
|merch1|  frozen|    11.0|
|merch2|   fruit|     6.0|
|merch2|   diary|     6.0|
|merch2|  frozen|     8.0|
|merch3| veggies|    13.0|
|merch3|organics|     4.0|
|merch3|  frozen|    10.0|
|merch4|   fruit|    28.0|
|merch4|organics|    11.0|
 ------ -------- --------   
    
df1 = df.groupby('id').pivot('category').agg(f.first('quantity')).fillna(0)
df1.show()
 ------ ----- ------ ----- -------- ------- 
|    id|diary|frozen|fruit|organics|veggies|
 ------ ----- ------ ----- -------- ------- 
|merch2|  6.0|   8.0|  6.0|     0.0|    0.0|
|merch4|  0.0|   0.0| 28.0|    11.0|    0.0|
|merch1| 10.0|  11.0| 20.0|    12.0|  300.0|
|merch3|  0.0|  10.0|  0.0|     4.0|   13.0|
 ------ ----- ------ ----- -------- ------- 
    
df1.select('id',f.array(df1.columns[1:]).name('vector')).show(truncate=False)
 ------ ------------------------------- 
|id    |vector                         |
 ------ ------------------------------- 
|merch2|[6.0, 8.0, 6.0, 0.0, 0.0]      |
|merch4|[0.0, 0.0, 28.0, 11.0, 0.0]    |
|merch1|[10.0, 11.0, 20.0, 12.0, 300.0]|
|merch3|[0.0, 10.0, 0.0, 4.0, 13.0]    |
 ------ ------------------------------- 
 

Комментарии:

1. Мне пришлось внести изменения, чтобы сделать его адаптируемым к фрейму данных spark.