Обновление строк на основе следующего появления определенного значения в pyspark фрейма данных

#apache-spark #pyspark #apache-spark-sql #pyspark-dataframes

#apache-spark #apache-spark-sql #pyspark

Вопрос:

Если у меня есть фрейм данных, подобный этому

     data = [(("ID1", "ENGAGEMENT", 2019-03-03)), (("ID1", "BABY SHOWER", 2019-04-13)), (("ID1", "WEDDING", 2019-07-10)), 
           (("ID1", "DIVORCE", 2019-09-26))]
    df = spark.createDataFrame(data, ["ID", "Event", "start_date"])
    df.show()
    
     --- ----------- ---------- 
    | ID|      Event|start_date|
     --- ----------- ---------- 
    |ID1| ENGAGEMENT|2019-03-03|
    |ID1|BABY SHOWER|2019-04-13|
    |ID1|    WEDDING|2019-07-10|
    |ID1|    DIVORCE|2019-09-26|
     --- ----------- ---------- 
  

Из этого фрейма данных дата окончания события должна быть выведена на основе даты начала последующих событий

Например: если у вас есть помолвка, то она закончится, когда состоится свадьба, поэтому вы должны принять дату начала свадьбы за дату окончания помолвки.

Таким образом, приведенный выше фрейм данных должен получать этот вывод.

  --- ----------- ---------- ---------- 
| ID|      Event|start_date|  end_date|
 --- ----------- ---------- ---------- 
|ID1| ENGAGEMENT|2019-03-03|2019-07-10|
|ID1|BABY SHOWER|2019-04-13|2019-04-13|
|ID1|    WEDDING|2019-07-10|2019-09-26|
|ID1|    DIVORCE|2019-09-26|      NULL|
 --- ----------- ---------- ---------- 
  

Сначала я попытался это сделать, используя функцию lead над окном, разделенным идентификатором, чтобы получить строки впереди, но, поскольку событие «Свадьба» может произойти через 20 строк, это не сработает, и это действительно грязный способ сделать это.

 df = df.select("*", *([f.lead(f.col(c),default=None).over(Window.orderBy("ID")).alias("LEAD_" c) 
                      for c in ["Event", "start_date"]]))

activity_dates = activity_dates.select("*", *([f.lead(f.col(c),default=None).over(Window.orderBy("ID")).alias("LEAD_" c) 
                      for c in ["LEAD_Event", "LEAD_start_date"]]))


df = df.withColumn("end_date", f.when((col("Event") == "ENGAGEMENT") amp; (col("LEAD_Event") == "WEDDING"), col("LEAD_start_date"))
                                .when((col("Event") == "ENGAGEMENT") amp; (col("LEAD_LEAD_Event") == "WEDDING"), col("LEAD_LEAD_start_date"))
  

Как я могу добиться этого без циклического просмотра набора данных?

Ответ №1:

Вот моя попытка.

 from pyspark.sql import Window
from pyspark.sql.functions import *

df.withColumn('end_date', expr('''
    case when Event = 'ENGAGEMENT'  then first(if(Event = 'WEDDING', start_date, null), True) over (Partition By ID)
         when Event = 'BABY SHOWER' then first(if(Event = 'BABY SHOWER', start_date, null), True) over (Partition By ID)
         when Event = 'WEDDING'     then first(if(Event = 'DIVORCE', start_date, null), True) over (Partition By ID)
    else null end
''')).show()

 --- ----------- ---------- ---------- 
| ID|      Event|start_date|  end_date|
 --- ----------- ---------- ---------- 
|ID1| ENGAGEMENT|2019-03-03|2019-07-10|
|ID1|BABY SHOWER|2019-04-13|2019-04-13|
|ID1|    WEDDING|2019-07-10|2019-09-26|
|ID1|    DIVORCE|2019-09-26|      null|
 --- ----------- ---------- ----------