Python Spark — удаление данных после порогового значения — Pyspark

#python #apache-spark #pyspark

#python #apache-spark #pyspark

Вопрос:

Как мне удалить данные после последнего TP == 1 с буфером в 48 часов?

Например ID = A9 , последнее TP == 1 включено 2020-05-06 13:00 . Я хочу сохранить все данные для этой группы идентификаторов вплоть до 2020-05-06 13:00 последнего TP == 1 плюс следующие 2 дня?

  ---  -------- ---------------- 
| id|       TP|            Date|
 --- --------- ---------------- 
| A1|     Null|2010-01-01 12:00|
| A1|     Null|2010-01-01 13:00|
| A1|        1|2010-01-02 01:00|
| A1|     Null|2010-01-02 02:00|
| A9|     Null|2010-05-05 12:00|
| A9|        1|2010-05-05 13:00|
| A9|        1|2010-05-06 13:00|
| A9|     Null|2010-05-09 13:00|
 --- --------- ---------------- 
  

Требуемый фрейм данных

  ---  -------- ---------------- 
| id|       TP|            Date|
 --- --------- ---------------- 
| A1|     Null|2010-01-01 12:00|
| A1|     Null|2010-01-01 13:00|
| A1|        1|2010-01-02 01:00|
| A1|     Null|2010-01-02 02:00|
| A9|     Null|2010-05-05 12:00|
| A9|        1|2010-05-05 13:00|
| A9|        1|2010-05-06 13:00|
 --- --------- ---------------- 
  

Это то, что я делаю в Pandas, но это неэффективно для наблюдений 15M

 main_pd = main.toPandas()

bigdf = pd.DataFrame()

for i in main_pd.ID.unique():
  df = main_pd[main_pd.ID == i]
  TPdate = df[df.TP == 1]['Date'].max() pd.Timedelta('3 days 0 hours')
  df = df[(df.Date <= TPdate)]
  bigdf = bigdf.append(df)
  

Комментарии:

1. Это должно помочь вам получить то, что вам нужно.

Ответ №1:

IIUC, вы можете использовать функцию Window для поиска max(IF(TP=1, Date, NULL)) по каждому id , а затем фильтровать по этому пороговому значению:

 from pyspark.sql import Window, functions as F
w1 = Window.partitionBy('id')

df_new = df.withColumn('Date', F.to_timestamp('Date', 'yyyy-MM-dd HH:mm')) 
    .withColumn('threshhold_date', F.expr("max(IF(TP=1, Date, NULL))").over(w1)) 
    .filter('Date <= threshhold_date   interval 2 days') 
df_new.show()
 --- ---- ------------------- ------------------- 
| id|  TP|               Date|    threshhold_date|
 --- ---- ------------------- ------------------- 
| A9|Null|2010-05-05 12:00:00|2010-05-06 13:00:00|
| A9|   1|2010-05-05 13:00:00|2010-05-06 13:00:00|
| A9|   1|2010-05-06 13:00:00|2010-05-06 13:00:00|
| A1|Null|2010-01-01 12:00:00|2010-01-02 01:00:00|
| A1|Null|2010-01-01 13:00:00|2010-01-02 01:00:00|
| A1|   1|2010-01-02 01:00:00|2010-01-02 01:00:00|
| A1|Null|2010-01-02 02:00:00|2010-01-02 01:00:00|
 --- ---- ------------------- ------------------- 
  

Ответ №2:

Вы можете просто отфильтровать фрейм данных для TP = 1 и использовать collect()[0] для получения максимального значения Date столбца в качестве переменной.
Добавьте 48 часов к этой переменной с помощью timedelta и отфильтруйте df :

 
from pyspark.sql.functions import *
from datetime import timedelta

date_var = df.filter(col("TP")==1).orderBy("date", ascending=False)
                .collect()[0]["date"]   timedelta(hours=48)

df.filter(col("Date")<=date_var).show()

  

Комментарии:

1. О, это было о разделах. Я неправильно понял вопрос.