Как использовать функцию окна pyspark dataframe

#python #dataframe #apache-spark #pyspark

#python #фрейм данных #apache-spark #pyspark

Вопрос:

У меня есть фрейм данных, как показано ниже

введите описание изображения здесь

Я хочу получить фрейм данных, который будет иметь самую последнюю версию с последней датой.Первым критерием фильтрации будет последняя версия, а затем последняя дата. Результирующий фрейм данных должен выглядеть следующим образом

введите описание изображения здесь

Я использую функцию окна для достижения этой цели.Я написал ниже фрагмент кода.

 wind = Window.partitionBy("id")
data = data.withColumn("maxVersion", F.max("version").over(wind)) 
               .withColumn("maxDt", F.max("dt").over(wind)) 
               .where(F.col("version") == F.col("maxVersion")) 
               .where(F.col("maxDt") == F.col("dt")) 
               .drop(F.col("maxVersion")) 
               .drop(F.col("maxDt"))
  

Я не уверен, где я что-то упускаю.Я получаю только один вывод с идентификатором 100.
Пожалуйста, помогите мне решить эту проблему

Ответ №1:

Как вы упомянули, в вашей работе есть порядок: сначала версия, затем dt В принципе, вам нужно выбрать только максимальную версию (удалив все остальное), а затем выбрать максимальный dt и удалить все остальное. Вам просто нужно переключить 2 строки следующим образом :

 wind = Window.partitionBy("id")
data = data.withColumn("maxVersion", F.max("version").over(wind)) 
               .where(F.col("version") == F.col("maxVersion")) 
               .withColumn("maxDt", F.max("dt").over(wind)) 
               .where(F.col("maxDt") == F.col("dt")) 
               .drop(F.col("maxVersion")) 
               .drop(F.col("maxDt"))
  

Причина, по которой вы получили только одну строку для идентификатора 100, заключается в том, что в этом случае максимальная версия и максимальный dt происходят в одной строке (вам повезло). Но это неверно для идентификатора 200.

Комментарии:

1. Я думаю, это тот ответ, который я искал. Это работает. Спасибо за ваш ответ.

Ответ №2:

В принципе, есть несколько проблем с вашей формулировкой. Сначала вам нужно изменить дату из строки в ее надлежащий формат даты. Затем Window в pyspark позволяет указать порядок следования столбцов один за другим. Затем есть rank() функция, которая позволяет вам ранжировать результаты по всему окну. Наконец, все, что остается, это выбрать первый ранг.

 from pyspark.sql.types import *
from pyspark import SparkContext, SQLContext
import pyspark.sql.functions as F
from pyspark.sql import Window

sc = SparkContext('local')
sqlContext = SQLContext(sc)

data1 = [
        (100,1,"2020-03-19","Nil1"),
        (100,2,"2020-04-19","Nil2"),
        (100,2,"2020-04-19","Nil2"),
        (100,2,"2020-05-19","Ni13"),
        (200,1,"2020-09-19","Jay1"),
        (200,2,"2020-07-19","Jay2"),
        (200,2,"2020-08-19","Jay3"),

      ]

df1Columns = ["id", "version", "dt",  "Name"]
df1 = sqlContext.createDataFrame(data=data1, schema = df1Columns)
df1 = df1.withColumn("dt",F.to_date(F.to_timestamp("dt", 'yyyy-MM-dd')).alias('dt'))
print("Schema.")
df1.printSchema()
print("Actual initial data")
df1.show(truncate=False)

wind = Window.partitionBy("id").orderBy(F.desc("version"), F.desc("dt"))

df1 = df1.withColumn("rank", F.rank().over(wind))
print("Ranking over the window spec specified")
df1.show(truncate=False)

final_df = df1.filter(F.col("rank") == 1).drop("rank")
print("Filtering the final result by applying the rank == 1 condition")
final_df.show(truncate=False)
  

Вывод :

 Schema.
root
 |-- id: long (nullable = true)
 |-- version: long (nullable = true)
 |-- dt: date (nullable = true)
 |-- Name: string (nullable = true)

Actual initial data
 --- ------- ---------- ---- 
|id |version|dt        |Name|
 --- ------- ---------- ---- 
|100|1      |2020-03-19|Nil1|
|100|2      |2020-04-19|Nil2|
|100|2      |2020-04-19|Nil2|
|100|2      |2020-05-19|Ni13|
|200|1      |2020-09-19|Jay1|
|200|2      |2020-07-19|Jay2|
|200|2      |2020-08-19|Jay3|
 --- ------- ---------- ---- 

Ranking over the window spec specified
 --- ------- ---------- ---- ---- 
|id |version|dt        |Name|rank|
 --- ------- ---------- ---- ---- 
|100|2      |2020-05-19|Ni13|1   |
|100|2      |2020-04-19|Nil2|2   |
|100|2      |2020-04-19|Nil2|2   |
|100|1      |2020-03-19|Nil1|4   |
|200|2      |2020-08-19|Jay3|1   |
|200|2      |2020-07-19|Jay2|2   |
|200|1      |2020-09-19|Jay1|3   |
 --- ------- ---------- ---- ---- 

Filtering the final result by applying the rank == 1 condition
 --- ------- ---------- ---- 
|id |version|dt        |Name|
 --- ------- ---------- ---- 
|100|2      |2020-05-19|Ni13|
|200|2      |2020-08-19|Jay3|
 --- ------- ---------- ---- 
  

Комментарии:

1. Хорошее решение. Но, если позволите, это не очень эффективно, поскольку включает сортировку всего фрейма данных.

2. @SimonDelecourt: только по идентификатору partitionBy.

3. Спасибо за ваш резонанс. Я действительно пробовал это, и это работает. Я блуждал, есть ли способ справиться с этим без использования ранга. Я предполагаю, что ранг вызовет проблемы с производительностью, поскольку нам нужно отсортировать весь фрейм данных, и произойдет перетасовка

4. @Nils: В основном это зависит от того, сколько строк на идентификатор у вас есть. Поскольку partitionBy гарантирует, что сортируются только строки в пределах этого идентификатора. Но я не уверен, есть ли у Catalyst optimizer оптимизация для этого случая, когда ранг, за которым следует фильтр, немедленно приводит к максимальной семантике. Я думаю, вам придется взглянуть на физические планы или эксперименты, чтобы проверить это.

Ответ №3:

Возможно, более аккуратным способом является выполнение следующего:

 w = Window.partitionBy("id").orderBy(F.col('version').desc(), F.col('dt').desc())
df1.withColumn('maximum', F.row_number().over(w)).filter('maximum = 1').drop('maximum').show()