#python #apache-spark #pyspark
#python #apache-spark #pyspark
Вопрос:
Как мне удалить данные после последнего TP == 1
с буфером в 48 часов?
Например ID = A9
, последнее TP == 1
включено 2020-05-06 13:00
. Я хочу сохранить все данные для этой группы идентификаторов вплоть до 2020-05-06 13:00
последнего TP == 1
плюс следующие 2 дня?
--- -------- ----------------
| id| TP| Date|
--- --------- ----------------
| A1| Null|2010-01-01 12:00|
| A1| Null|2010-01-01 13:00|
| A1| 1|2010-01-02 01:00|
| A1| Null|2010-01-02 02:00|
| A9| Null|2010-05-05 12:00|
| A9| 1|2010-05-05 13:00|
| A9| 1|2010-05-06 13:00|
| A9| Null|2010-05-09 13:00|
--- --------- ----------------
Требуемый фрейм данных
--- -------- ----------------
| id| TP| Date|
--- --------- ----------------
| A1| Null|2010-01-01 12:00|
| A1| Null|2010-01-01 13:00|
| A1| 1|2010-01-02 01:00|
| A1| Null|2010-01-02 02:00|
| A9| Null|2010-05-05 12:00|
| A9| 1|2010-05-05 13:00|
| A9| 1|2010-05-06 13:00|
--- --------- ----------------
Это то, что я делаю в Pandas, но это неэффективно для наблюдений 15M
main_pd = main.toPandas()
bigdf = pd.DataFrame()
for i in main_pd.ID.unique():
df = main_pd[main_pd.ID == i]
TPdate = df[df.TP == 1]['Date'].max() pd.Timedelta('3 days 0 hours')
df = df[(df.Date <= TPdate)]
bigdf = bigdf.append(df)
Комментарии:
1. Это должно помочь вам получить то, что вам нужно.
Ответ №1:
IIUC, вы можете использовать функцию Window для поиска max(IF(TP=1, Date, NULL))
по каждому id
, а затем фильтровать по этому пороговому значению:
from pyspark.sql import Window, functions as F
w1 = Window.partitionBy('id')
df_new = df.withColumn('Date', F.to_timestamp('Date', 'yyyy-MM-dd HH:mm'))
.withColumn('threshhold_date', F.expr("max(IF(TP=1, Date, NULL))").over(w1))
.filter('Date <= threshhold_date interval 2 days')
df_new.show()
--- ---- ------------------- -------------------
| id| TP| Date| threshhold_date|
--- ---- ------------------- -------------------
| A9|Null|2010-05-05 12:00:00|2010-05-06 13:00:00|
| A9| 1|2010-05-05 13:00:00|2010-05-06 13:00:00|
| A9| 1|2010-05-06 13:00:00|2010-05-06 13:00:00|
| A1|Null|2010-01-01 12:00:00|2010-01-02 01:00:00|
| A1|Null|2010-01-01 13:00:00|2010-01-02 01:00:00|
| A1| 1|2010-01-02 01:00:00|2010-01-02 01:00:00|
| A1|Null|2010-01-02 02:00:00|2010-01-02 01:00:00|
--- ---- ------------------- -------------------
Ответ №2:
Вы можете просто отфильтровать фрейм данных для TP = 1
и использовать collect()[0]
для получения максимального значения Date
столбца в качестве переменной.
Добавьте 48 часов к этой переменной с помощью timedelta
и отфильтруйте df
:
from pyspark.sql.functions import *
from datetime import timedelta
date_var = df.filter(col("TP")==1).orderBy("date", ascending=False)
.collect()[0]["date"] timedelta(hours=48)
df.filter(col("Date")<=date_var).show()
Комментарии:
1. О, это было о разделах. Я неправильно понял вопрос.