Искровая вычислительная дисперсия над окном

#python #apache-spark #pyspark #apache-spark-sql #window-functions

#python #apache-spark #pyspark #apache-spark-sql #оконные функции

Вопрос:

у меня есть фрейм данных, структурированный следующим образом:

  ------------------ 
|   id  |     value|
 ------------------ 
|  user0|     100  |
|  user1|     102  |
|  user0|     109  |
|  user2|     103  |
|  user1|     108  |
|  user0|     119  |
|  user0|     140  |
|  user0|     142  |
 ------------------ 
 

Я хотел бы вычислить разницу между каждой строкой и предшествующей для каждого идентификатора, для этого я попробовал следующий код:

 import pyspark.sql.functions as F

w_vv = Window.partitionBy('id')  
df=df.withColumn('variances',F.round(F.var_pop("value"),2).over(w_vv.rowsBetween(Window.unboundedPreceding,0)))
 

это идеальный желаемый результат

  -------------------------------------------------------------- 
|   User|  value|                                     variances|
 -------------------------------------------------------------- 
|  user0| value1|         -                                    |
|  user1| value1|         -                                    |
|  user0| value2|  variance(value2,value1)                     |
|  user1| value2|  variance(value2,value1)                     |
|  user1| value3|  variance(value3,value2,value1)              |
|  user0| value3|  variance(value4,value3,value2,value1)       |
|  user0| value4|  variance(value4,value3,value2,value1)       |
|  user0| value5|  variance(value5,value4,value3,value2,value1)|
 -------------------------------------------------------------- 
 

предыдущий вывод с числами в качестве примера:

  --------------------------- 
|   User|  value|  variances|
 --------------------------- 
|  user0| 2|         -      |
|  user1| 4|         -      |
|  user0| 3| 0.25           |
|  user1| 3| 0.25           |
|  user1| 9| 6.9            |
|  user0| 7| 4.7            |
|  user0| 3| 3.7            |     
|  user0| 4| 3              |
 --------------------------- 
 

однако код возвращает следующую ошибку

 grouping expressions sequence is empty, and '`timestamp`' is not an aggregate function.  
Wrap '(var_pop(CAST(`value` AS DOUBLE)) AS `_w0`)' in windowing function(s) or wrap  
'`timestamp`' in first() (or first_value) if you don't care which value you get.;;
 

Я понимаю, что агрегатные функции следует использовать поверх groupBy, но я не знаю, как это закодировать, чтобы это работало, есть идеи? Спасибо

Комментарии:

1. можете ли вы добавить ожидаемый числовой результат?

2. @thebluephantom конечно, готово.

Ответ №1:

Вы должны прикрепить окно к var_pop , а не round :

 w_vv = Window.partitionBy('id')  

df = df.withColumn('variances',
    F.round(
        F.var_pop("value")
         .over(w_vv.rowsBetween(Window.unboundedPreceding,0)
    , 2)
)