#python #apache-spark #pyspark #pyspark-sql
#python #apache-spark #pyspark #apache-spark-sql
Вопрос:
У меня есть фрейм данных, который содержит строки, представляющие экземпляр рейтинга конкретного фильма пользователем. Каждый фильм может быть оценен в нескольких категориях несколькими пользователями. Это результирующий фрейм данных, который я создал, используя данные movie_lens.
|movie_id|year|categories|
-------- ---- ----------
| 122|1990| Comedy|
| 122|1990| Romance|
| 185|1990| Action|
| 185|1990| Crime|
| 185|1990| Thriller|
| 231|1990| Comedy|
| 292|1990| Action|
| 292|1990| Drama|
| 292|1990| Sci-Fi|
| 292|1990| Thriller|
| 316|1990| Action|
| 316|1990| Adventure|
| 316|1990| Sci-Fi|
| 329|1990| Action|
| 329|1990| Adventure|
| 329|1990| Drama|
.
.
.
movie_id — это уникальный идентификатор фильма, year — это год, в котором пользователь оценил фильм, category — одна из 12 категорий фильма. Частичный файл здесь
Я хочу найти фильм с наибольшим рейтингом за каждое десятилетие в каждой категории (подсчитывая частоту каждого фильма за каждое десятилетие в каждой категории)
что — то вроде
-----------------------------------
| year | category | movie_id | rank |
-----------------------------------
| 1990 | Comedy | 1273 | 1 |
| 1990 | Comedy | 6547 | 2 |
| 1990 | Comedy | 8973 | 3 |
.
.
| 1990 | Comedy | 7483 | 10 |
.
.
| 1990 | Drama | 1273 | 1 |
| 1990 | Drama | 6547 | 2 |
| 1990 | Drama | 8973 | 3 |
.
.
| 1990 | Comedy | 7483 | 10 |
.
.
| 2000 | Comedy | 1273 | 1 |
| 2000 | Comedy | 6547 | 2 |
.
.
for every decade, top 10 movies in each category
Я понимаю, что необходимо использовать оконную функцию pyspark. Это то, что я пробовал
windowSpec = Window.partitionBy(res_agg['year']).orderBy(res_agg['categories'].desc())
final = res_agg.select(res_agg['year'], res_agg['movie_id'], res_agg['categories']).withColumn('rank', func.rank().over(windowSpec))
но он возвращает что-то вроде приведенного ниже:
---- -------- ------------------ ----
|year|movie_id| categories|rank|
---- -------- ------------------ ----
|2000| 8606|(no genres listed)| 1|
|2000| 1587| Action| 1|
|2000| 1518| Action| 1|
|2000| 2582| Action| 1|
|2000| 5460| Action| 1|
|2000| 27611| Action| 1|
|2000| 48304| Action| 1|
|2000| 54995| Action| 1|
|2000| 4629| Action| 1|
|2000| 26606| Action| 1|
|2000| 56775| Action| 1|
|2000| 62008| Action| 1|
Я довольно новичок в pyspark и застрял здесь. Кто-нибудь может подсказать мне, что я делаю неправильно.
Ответ №1:
Вы правы, вам нужно использовать window, но сначала вам нужно выполнить первую агрегацию для вычисления частот.
Сначала давайте вычислим десятилетие.
df_decade = df.withColumn("decade", concat(substring(col("year"), 0, 3), lit("0")))
Затем мы вычисляем частоту по десятилетию, категории и movie_id:
agg_df = df_decade
.groupBy("decade", "category", "movie_id")
.agg(count(col("*")).alias("freq"))
И, наконец, мы определяем окно, разделенное по десятилетиям и категориям, и выбираем 10 лучших, используя функцию ранжирования:
w = Window.partitionBy("decade", "category").orderBy(desc("freq"))
top10 = agg_df.withColumn("r", rank().over(w)).where(col("r") <= 10)