#python #apache-spark #pyspark #apache-spark-sql #window-functions
#python #apache-spark #pyspark #apache-spark-sql #оконные функции
Вопрос:
у меня есть фрейм данных, структурированный следующим образом:
------------------
| id | value|
------------------
| user0| 100 |
| user1| 102 |
| user0| 109 |
| user2| 103 |
| user1| 108 |
| user0| 119 |
| user0| 140 |
| user0| 142 |
------------------
Я хотел бы вычислить разницу между каждой строкой и предшествующей для каждого идентификатора, для этого я попробовал следующий код:
import pyspark.sql.functions as F
w_vv = Window.partitionBy('id')
df=df.withColumn('variances',F.round(F.var_pop("value"),2).over(w_vv.rowsBetween(Window.unboundedPreceding,0)))
это идеальный желаемый результат
--------------------------------------------------------------
| User| value| variances|
--------------------------------------------------------------
| user0| value1| - |
| user1| value1| - |
| user0| value2| variance(value2,value1) |
| user1| value2| variance(value2,value1) |
| user1| value3| variance(value3,value2,value1) |
| user0| value3| variance(value4,value3,value2,value1) |
| user0| value4| variance(value4,value3,value2,value1) |
| user0| value5| variance(value5,value4,value3,value2,value1)|
--------------------------------------------------------------
предыдущий вывод с числами в качестве примера:
---------------------------
| User| value| variances|
---------------------------
| user0| 2| - |
| user1| 4| - |
| user0| 3| 0.25 |
| user1| 3| 0.25 |
| user1| 9| 6.9 |
| user0| 7| 4.7 |
| user0| 3| 3.7 |
| user0| 4| 3 |
---------------------------
однако код возвращает следующую ошибку
grouping expressions sequence is empty, and '`timestamp`' is not an aggregate function.
Wrap '(var_pop(CAST(`value` AS DOUBLE)) AS `_w0`)' in windowing function(s) or wrap
'`timestamp`' in first() (or first_value) if you don't care which value you get.;;
Я понимаю, что агрегатные функции следует использовать поверх groupBy, но я не знаю, как это закодировать, чтобы это работало, есть идеи? Спасибо
Комментарии:
1. можете ли вы добавить ожидаемый числовой результат?
2. @thebluephantom конечно, готово.
Ответ №1:
Вы должны прикрепить окно к var_pop
, а не round
:
w_vv = Window.partitionBy('id')
df = df.withColumn('variances',
F.round(
F.var_pop("value")
.over(w_vv.rowsBetween(Window.unboundedPreceding,0)
, 2)
)