#python #apache-spark #pyspark
Вопрос:
У меня есть фрейм данных pyspark:
--- ------------------
|Id |Friend_Ids|Points |
------ ------- -------
|1 |[2, 5] | 5 |
|2 |[1, 3, 4] | 6 |
|3 |[2, 4] | 2 |
|4 |[3, 5, 2] | 12 |
|5 |[1, 4] | 1 |
------ ------- -------
Я хочу получить столбец с суммой очков друзей каждого идентификатора:
--- ------------------ -------------------
|Id |Friend_Ids|Points |Friends_points_sum |
------ ------- ------- -------------------
|1 |[2, 5] | 5 | 7 |
|2 |[1, 3, 4] | 6 | 19 |
|3 |[2, 4] | 2 | 18 |
|4 |[2, 3, 5] | 12 | 9 |
|5 |[1, 4] | 1 | 17 |
------ ------- ------- -------------------
Я уже пробовал это
df.withColumn("friends_points_sum",df.filter(F.col('Id').isin(F.col('Friends_Ids'))
.agg(F.sum('points')).collect()[0][0])
и получить TypeError: 'Column' object is not callable
Я также пробовал udf, как
def sum_agg(array, df):
id = array[0]
friend_ids = array[1]
points = array[2]
s = df.filter(F.col(id).isin(friend_id)).agg(F.sum('points')).collect()[0][0]
return s.tolist()
points_sum = F.udf(qnty_agg, IntegerType())
df.withColumn("friends_points_sum", qnty_sum(F.array('id','Friend_Ids','Points'), df))
но он не принимает df в качестве аргумента
Ответ №1:
Возможно, вам захочется сначала перейти explode
в столбец friend_ids, затем самостоятельно присоединиться к фрейму данных для поиска значений точек, а затем, наконец, агрегировать значения
df = (df
.selectExpr('id', 'points', 'explode(friend_ids) fi')
.join(df.selectExpr('id i', 'points pts'), F.col('fi') == F.col('i'), 'inner')
.groupby('id', 'points')
.agg(F.collect_list('fi').alias('friend_ids'), F.sum('pts').alias('friend_points_sum')))
df.show()
--- ------ ---------- -----------------
| id|points|friend_ids|friend_points_sum|
--- ------ ---------- -----------------
| 3| 2| [2, 4]| 18|
| 1| 5| [5, 2]| 7|
| 5| 1| [1, 4]| 17|
| 4| 12| [5, 2, 3]| 9|
| 2| 6| [1, 3, 4]| 19|
--- ------ ---------- -----------------