spark dataframe сохраняет самую последнюю запись

#python #apache-spark

#python #apache-spark

Вопрос:

У меня есть фрейм данных, похожий на:

 id  | date       | value
--- | ---------- | ------
1   | 2016-01-07 | 13.90
1   | 2016-01-16 | 14.50
2   | 2016-01-09 | 10.50
2   | 2016-01-28 | 5.50
3   | 2016-01-05 | 1.50
 

Я пытаюсь сохранить самые последние значения для каждого идентификатора, например:

 id  | date       | value
--- | ---------- | ------
1   | 2016-01-16 | 14.50
2   | 2016-01-28 | 5.50
3   | 2016-01-05 | 1.50
 

Я пробовал сортировать по дате desc и после удаления дубликатов:

 new_df = df.orderBy(df.date.desc()).dropDuplicates(['id'])   
 

Мои вопросы: dropDuplicates() сохранит ли первое повторяющееся значение, которое он найдет? и есть ли лучший способ выполнить то, что я хочу сделать? Кстати, я использую python.

Спасибо.

Комментарии:

1. Посмотрите на оконные функции . В частности, вы захотите использовать row_number() или rank() в зависимости от того, как вы хотите обрабатывать связанные первые.

Ответ №1:

Предложенный оператор window очень хорошо подходит для решения этой проблемы:

 from datetime import date

rdd = sc.parallelize([
    [1, date(2016, 1, 7), 13.90],
    [1, date(2016, 1, 16), 14.50],
    [2, date(2016, 1, 9), 10.50],
    [2, date(2016, 1, 28), 5.50],
    [3, date(2016, 1, 5), 1.50]
])

df = rdd.toDF(['id','date','price'])
df.show()

 --- ---------- ----- 
| id|      date|price|
 --- ---------- ----- 
|  1|2016-01-07| 13.9|
|  1|2016-01-16| 14.5|
|  2|2016-01-09| 10.5|
|  2|2016-01-28|  5.5|
|  3|2016-01-05|  1.5|
 --- ---------- ----- 

df.registerTempTable("entries") // Replaced by createOrReplaceTempView in Spark 2.0

output = sqlContext.sql('''
    SELECT 
        *
    FROM (
        SELECT 
            *,
            dense_rank() OVER (PARTITION BY id ORDER BY date DESC) AS rank
        FROM entries
    ) vo WHERE rank = 1
''');

output.show();

 --- ---------- ----- ---- 
| id|      date|price|rank|
 --- ---------- ----- ---- 
|  1|2016-01-16| 14.5|   1|
|  2|2016-01-28|  5.5|   1|
|  3|2016-01-05|  1.5|   1|
 --- ---------- ----- ---- 
 

Комментарии:

1. Отличный материал, простое и практичное решение!

Ответ №2:

Если у вас есть элементы с одинаковой датой, вы получите дубликаты с dense_rank . Вы должны использовать row_number:

 from pyspark.sql.window import Window
from datetime import date
​import pyspark.sql.functions as F 

rdd = spark.sparkContext.parallelize([
    [1, date(2016, 1, 7), 13.90],
    [1, date(2016, 1, 7), 10.0 ], # I added this row to show the effect of duplicate
    [1, date(2016, 1, 16), 14.50],
    [2, date(2016, 1, 9), 10.50],
    [2, date(2016, 1, 28), 5.50],
    [3, date(2016, 1, 5), 1.50]]
)
​
df = rdd.toDF(['id','date','price'])
df.show(10)

 --- ---------- ----- 
| id|      date|price|
 --- ---------- ----- 
|  1|2016-01-07| 13.9|
|  1|2016-01-07| 10.0|
|  1|2016-01-16| 14.5|
|  2|2016-01-09| 10.5|
|  2|2016-01-28|  5.5|
|  3|2016-01-05|  1.5|
 --- ---------- ----- 


# row_number
df.withColumn("row_number",F.row_number().over(Window.partitionBy(df.id).orderBy(df.date))).filter(F.col("row_number")==1).show()
​
 --- ---------- ----- ---------- 
| id|      date|price|row_number|
 --- ---------- ----- ---------- 
|  3|2016-01-05|  1.5|         1|
|  1|2016-01-07| 13.9|         1|
|  2|2016-01-09| 10.5|         1|
 --- ---------- ----- ---------- 

# dense_rank
df.withColumn("dense_rank",F.dense_rank().over(Window.partitionBy(df.id).orderBy(df.date))).filter(F.col("dense_rank")==1).show()

 --- ---------- ----- ---------- 
| id|      date|price|dense_rank|
 --- ---------- ----- ---------- 
|  3|2016-01-05|  1.5|         1|
|  1|2016-01-07| 13.9|         1|
|  1|2016-01-07| 10.0|         1|
|  2|2016-01-09| 10.5|         1|
 --- ---------- ----- ---------- 

 

Ответ №3:

Вы можете использовать row_number, чтобы получить запись с последней датой:

 import pyspark.sql.functions as F
from pyspark.sql.window import Window

new_df = df.withColumn("row_number",F.row_number().over(Window.partitionBy(df.id).orderBy(df.date.desc()))).filter(F.col("row_number")==1).drop("row_number")