pyspark с использованием оконной функции

#python #apache-spark #pyspark #pyspark-sql

#python #apache-spark #pyspark #apache-spark-sql

Вопрос:

У меня есть фрейм данных, который содержит строки, представляющие экземпляр рейтинга конкретного фильма пользователем. Каждый фильм может быть оценен в нескольких категориях несколькими пользователями. Это результирующий фрейм данных, который я создал, используя данные movie_lens.

 |movie_id|year|categories|
 -------- ---- ---------- 
|     122|1990|    Comedy|
|     122|1990|   Romance|
|     185|1990|    Action|
|     185|1990|     Crime|
|     185|1990|  Thriller|
|     231|1990|    Comedy|
|     292|1990|    Action|
|     292|1990|     Drama|
|     292|1990|    Sci-Fi|
|     292|1990|  Thriller|
|     316|1990|    Action|
|     316|1990| Adventure|
|     316|1990|    Sci-Fi|
|     329|1990|    Action|
|     329|1990| Adventure|
|     329|1990|     Drama|
.
.
.
  

movie_id — это уникальный идентификатор фильма, year — это год, в котором пользователь оценил фильм, category — одна из 12 категорий фильма. Частичный файл здесь

Я хочу найти фильм с наибольшим рейтингом за каждое десятилетие в каждой категории (подсчитывая частоту каждого фильма за каждое десятилетие в каждой категории)

что — то вроде

  ----------------------------------- 
| year | category | movie_id | rank |
 ----------------------------------- 
| 1990 | Comedy   | 1273     | 1    |
| 1990 | Comedy   | 6547     | 2    |
| 1990 | Comedy   | 8973     | 3    |
.
.
| 1990 | Comedy   | 7483     | 10   |
.
.
| 1990 | Drama    | 1273     | 1    |
| 1990 | Drama    | 6547     | 2    |
| 1990 | Drama    | 8973     | 3    |
.
.
| 1990 | Comedy   | 7483     | 10   |  
.
.
| 2000 | Comedy   | 1273     | 1    |
| 2000 | Comedy   | 6547     | 2    |
.
.

for every decade, top 10 movies in each category 
  

Я понимаю, что необходимо использовать оконную функцию pyspark. Это то, что я пробовал

 windowSpec = Window.partitionBy(res_agg['year']).orderBy(res_agg['categories'].desc())

final = res_agg.select(res_agg['year'], res_agg['movie_id'], res_agg['categories']).withColumn('rank', func.rank().over(windowSpec))
  

но он возвращает что-то вроде приведенного ниже:

  ---- -------- ------------------ ---- 
|year|movie_id|        categories|rank|
 ---- -------- ------------------ ---- 
|2000|    8606|(no genres listed)|   1|
|2000|    1587|            Action|   1|
|2000|    1518|            Action|   1|
|2000|    2582|            Action|   1|
|2000|    5460|            Action|   1|
|2000|   27611|            Action|   1|
|2000|   48304|            Action|   1|
|2000|   54995|            Action|   1|
|2000|    4629|            Action|   1|
|2000|   26606|            Action|   1|
|2000|   56775|            Action|   1|
|2000|   62008|            Action|   1|
  

Я довольно новичок в pyspark и застрял здесь. Кто-нибудь может подсказать мне, что я делаю неправильно.

Ответ №1:

Вы правы, вам нужно использовать window, но сначала вам нужно выполнить первую агрегацию для вычисления частот.

Сначала давайте вычислим десятилетие.

 df_decade = df.withColumn("decade", concat(substring(col("year"), 0, 3), lit("0")))
  

Затем мы вычисляем частоту по десятилетию, категории и movie_id:

 agg_df = df_decade
      .groupBy("decade", "category", "movie_id")
      .agg(count(col("*")).alias("freq"))
  

И, наконец, мы определяем окно, разделенное по десятилетиям и категориям, и выбираем 10 лучших, используя функцию ранжирования:

 w = Window.partitionBy("decade", "category").orderBy(desc("freq"))
top10 = agg_df.withColumn("r", rank().over(w)).where(col("r") <= 10)