#python #dataframe #pyspark #hive #pyspark-dataframes
#python #фрейм данных #улей #pyspark
Вопрос:
У меня есть дата ввода, но в качестве примера, как показано ниже: df_inp
customer_id |ph_num|date |
1 |123 |2020-10-01|
2 |456 |2020-10-01|
3 |789 |2020-10-01|
1 |654 |2020-10-02|
2 |543 |2020-10-03|
1 |908 |2020-10-04|
4 |123 |2020-10-02|
Мне нужно получить последнюю запись для каждого ежедневного процесса. Итак, я попробовал использовать операцию Windows rank (), и она работает. Но, поскольку входные данные поступают миллионами, для оптимизации производительности мы можем использовать любые другие операции spark для получения последних данных на основе customer_id и упорядочения по значению даты.
window_func = Window.partition_by("customer_id ").orderBy("date")
df = df.withColumn("rank", rank().over(window_func))
df = df.filter(df.rank == "1")
Здесь используется строка customer_id и метка даты и времени
Ответ №1:
Для Spark 3.0 , возможно, стоит проверить, имеет ли max_by (или min_by, если вы принимаете ранг 1, как в вопросе) лучшие характеристики производительности, чем window
filter
подход .
df.groupBy("customer_id").agg(F.expr("max_by(ph_num,date)"), F.max(F.col("date")))
Результат тот же, что и в вопросе. Сравнивая планы выполнения обоих подходов max_by
, у способа на одно преобразование (the filter
) меньше, но оба подхода вызовут один обмен.
Комментарии:
1. Спасибо, как я могу динамически передавать столбцы groupby и столбцы agg max?? или через переменную as??
2. @Rocky1989 строки являются обычными строками Python и могут быть заменены переменной