Еще один способ получения последних записей на основе значения даты

#python #dataframe #pyspark #hive #pyspark-dataframes

#python #фрейм данных #улей #pyspark

Вопрос:

У меня есть дата ввода, но в качестве примера, как показано ниже: df_inp

 customer_id  |ph_num|date      |
1            |123   |2020-10-01|
2            |456   |2020-10-01|
3            |789   |2020-10-01|
1            |654   |2020-10-02|
2            |543   |2020-10-03|
1            |908   |2020-10-04|
4            |123   |2020-10-02|
  

Мне нужно получить последнюю запись для каждого ежедневного процесса. Итак, я попробовал использовать операцию Windows rank (), и она работает. Но, поскольку входные данные поступают миллионами, для оптимизации производительности мы можем использовать любые другие операции spark для получения последних данных на основе customer_id и упорядочения по значению даты.

 window_func = Window.partition_by("customer_id ").orderBy("date")
df = df.withColumn("rank", rank().over(window_func))
df = df.filter(df.rank == "1")
  

Здесь используется строка customer_id и метка даты и времени

Ответ №1:

Для Spark 3.0 , возможно, стоит проверить, имеет ли max_by (или min_by, если вы принимаете ранг 1, как в вопросе) лучшие характеристики производительности, чем window filter подход .

 df.groupBy("customer_id").agg(F.expr("max_by(ph_num,date)"), F.max(F.col("date")))
  

Результат тот же, что и в вопросе. Сравнивая планы выполнения обоих подходов max_by , у способа на одно преобразование (the filter ) меньше, но оба подхода вызовут один обмен.

Комментарии:

1. Спасибо, как я могу динамически передавать столбцы groupby и столбцы agg max?? или через переменную as??

2. @Rocky1989 строки являются обычными строками Python и могут быть заменены переменной