#python #apache-spark #pyspark #apache-spark-sql
Вопрос:
У меня есть следующие столбцы DataFrame или user_id и label. У одного пользователя может быть несколько меток.
df = spark.createDataFrame(
[(1, "a"), (2, "b"), (3, "a"), (1, "c"), (4, "b"), (5, "c"), (6, "a"), (7, "e")], ['user_id', 'label']
)
------- -----
|user_id|label|
------- -----
| 1| a|
| 2| b|
| 3| a|
| 1| c|
| 4| b|
| 5| c|
| 6| a|
| 7| e|
------- -----
Я хочу создать новый фрейм данных, содержащий 1 строку для каждого пользователя и показывающий массив всех других пользователей, с которыми они обмениваются метками:
------- -------------
|user_id| other_users|
------- -------------
| 1| [3, 5, 6]|
| 2| [4]|
| 3| [1, 6]|
| 4| [2]|
| 5| [1]|
| 6| [1, 3]|
| 7| []|
------- -------------
Каков наилучший способ достичь этого?
Ответ №1:
Вы можете присоединиться к самому фрейму данных и использовать collect_list
from pyspark.sql.functions import col, collect_list
df = (df
.join(df.selectExpr('user_id ui', 'label lb'),
[col('label') == col('lb'), col('user_id') != col('ui')],
'left')
.groupBy('user_id').agg(collect_list('ui').alias('other_users')))
df.show()
------- -----------
|user_id|other_users|
------- -----------
| 7| []|
| 6| [1, 3]|
| 5| [1]|
| 1| [5, 3, 6]|
| 3| [1, 6]|
| 2| [4]|
| 4| [2]|
------- -----------
Ответ №2:
По-другому. Я сделал это, но увидел ответ @Wai Ha Lee, который я поддержал, потому что он был более кратким. Сдерживался, но решил поделиться и предложить альтернативный способ.
h=Window.partitionBy('label')#grouper 1
g=Window.partitionBy('user_id')#grouper 2
df1=(df.withColumn('other_users',F.collect_list('user_id').over(h))#For every lable collect user_id
.withColumn("user_id", array(df['user_id']))#Convert user_id column to list
.withColumn('other_users',F.array_distinct(F.flatten(F.collect_list('other_users').over(g))))#Combine user_id lists in the other_users columns
.withColumn("other_users", array_except(col("other_users"), col("user_id"))))#Exclude user_ids
df1.show()