#python #apache-spark
#python #apache-spark
Вопрос:
У меня есть фрейм данных, похожий на:
id | date | value
--- | ---------- | ------
1 | 2016-01-07 | 13.90
1 | 2016-01-16 | 14.50
2 | 2016-01-09 | 10.50
2 | 2016-01-28 | 5.50
3 | 2016-01-05 | 1.50
Я пытаюсь сохранить самые последние значения для каждого идентификатора, например:
id | date | value
--- | ---------- | ------
1 | 2016-01-16 | 14.50
2 | 2016-01-28 | 5.50
3 | 2016-01-05 | 1.50
Я пробовал сортировать по дате desc и после удаления дубликатов:
new_df = df.orderBy(df.date.desc()).dropDuplicates(['id'])
Мои вопросы: dropDuplicates()
сохранит ли первое повторяющееся значение, которое он найдет? и есть ли лучший способ выполнить то, что я хочу сделать? Кстати, я использую python.
Спасибо.
Комментарии:
1. Посмотрите на оконные функции . В частности, вы захотите использовать
row_number()
илиrank()
в зависимости от того, как вы хотите обрабатывать связанные первые.
Ответ №1:
Предложенный оператор window очень хорошо подходит для решения этой проблемы:
from datetime import date
rdd = sc.parallelize([
[1, date(2016, 1, 7), 13.90],
[1, date(2016, 1, 16), 14.50],
[2, date(2016, 1, 9), 10.50],
[2, date(2016, 1, 28), 5.50],
[3, date(2016, 1, 5), 1.50]
])
df = rdd.toDF(['id','date','price'])
df.show()
--- ---------- -----
| id| date|price|
--- ---------- -----
| 1|2016-01-07| 13.9|
| 1|2016-01-16| 14.5|
| 2|2016-01-09| 10.5|
| 2|2016-01-28| 5.5|
| 3|2016-01-05| 1.5|
--- ---------- -----
df.registerTempTable("entries") // Replaced by createOrReplaceTempView in Spark 2.0
output = sqlContext.sql('''
SELECT
*
FROM (
SELECT
*,
dense_rank() OVER (PARTITION BY id ORDER BY date DESC) AS rank
FROM entries
) vo WHERE rank = 1
''');
output.show();
--- ---------- ----- ----
| id| date|price|rank|
--- ---------- ----- ----
| 1|2016-01-16| 14.5| 1|
| 2|2016-01-28| 5.5| 1|
| 3|2016-01-05| 1.5| 1|
--- ---------- ----- ----
Комментарии:
1. Отличный материал, простое и практичное решение!
Ответ №2:
Если у вас есть элементы с одинаковой датой, вы получите дубликаты с dense_rank . Вы должны использовать row_number:
from pyspark.sql.window import Window
from datetime import date
import pyspark.sql.functions as F
rdd = spark.sparkContext.parallelize([
[1, date(2016, 1, 7), 13.90],
[1, date(2016, 1, 7), 10.0 ], # I added this row to show the effect of duplicate
[1, date(2016, 1, 16), 14.50],
[2, date(2016, 1, 9), 10.50],
[2, date(2016, 1, 28), 5.50],
[3, date(2016, 1, 5), 1.50]]
)
df = rdd.toDF(['id','date','price'])
df.show(10)
--- ---------- -----
| id| date|price|
--- ---------- -----
| 1|2016-01-07| 13.9|
| 1|2016-01-07| 10.0|
| 1|2016-01-16| 14.5|
| 2|2016-01-09| 10.5|
| 2|2016-01-28| 5.5|
| 3|2016-01-05| 1.5|
--- ---------- -----
# row_number
df.withColumn("row_number",F.row_number().over(Window.partitionBy(df.id).orderBy(df.date))).filter(F.col("row_number")==1).show()
--- ---------- ----- ----------
| id| date|price|row_number|
--- ---------- ----- ----------
| 3|2016-01-05| 1.5| 1|
| 1|2016-01-07| 13.9| 1|
| 2|2016-01-09| 10.5| 1|
--- ---------- ----- ----------
# dense_rank
df.withColumn("dense_rank",F.dense_rank().over(Window.partitionBy(df.id).orderBy(df.date))).filter(F.col("dense_rank")==1).show()
--- ---------- ----- ----------
| id| date|price|dense_rank|
--- ---------- ----- ----------
| 3|2016-01-05| 1.5| 1|
| 1|2016-01-07| 13.9| 1|
| 1|2016-01-07| 10.0| 1|
| 2|2016-01-09| 10.5| 1|
--- ---------- ----- ----------
Ответ №3:
Вы можете использовать row_number, чтобы получить запись с последней датой:
import pyspark.sql.functions as F
from pyspark.sql.window import Window
new_df = df.withColumn("row_number",F.row_number().over(Window.partitionBy(df.id).orderBy(df.date.desc()))).filter(F.col("row_number")==1).drop("row_number")