Состояние фрейма данных PySpark по окну / задержка

#apache-spark #pyspark #apache-spark-sql #rolling-computation

#apache-spark #pyspark #apache-spark-sql #переходное вычисление

Вопрос:

У меня есть фрейм данных Spark, например, так:

 # For sake of simplicity only one user (uid) is shown, but there are multiple users 
 ------------------- ----- ------- 
|start_date         |uid  |count  |
 ------------------- ----- ------- 
|2020-11-26 08:30:22|user1|  4    |
|2020-11-26 10:00:00|user1|  3    |
|2020-11-22 08:37:18|user1|  3    |
|2020-11-22 13:32:30|user1|  2    |
|2020-11-20 16:04:04|user1|  2    |
|2020-11-16 12:04:04|user1|  1    |
 

Я хочу создать новый логический столбец, значения которого равны True / False, если у пользователя было хотя бы count >= x событий в прошлом, и пометить эти события значением True. Например, для x = 3 я ожидаю получить:

  ------------------- ----- ------- -------------- 
|start_date         |uid  |count  | marked_event |
 ------------------- ----- ------- -------------- 
|2020-11-26 08:30:22|user1|  4    |  True        |
|2020-11-26 10:00:00|user1|  3    |  True        |
|2020-11-22 08:37:18|user1|  3    |  True        |
|2020-11-22 13:32:30|user1|  2    |  True        |
|2020-11-20 16:04:04|user1|  2    |  True        |
|2020-11-16 12:04:04|user1|  1    |  False       |
 

То есть для каждого количества> = 3 мне нужно пометить это событие значением True, а также предыдущие 3 события. Только последнее событие user1 является ложным, потому что я отмечаю 3 события до (и включая) событие в start_date = 2020-11-22 08:37:18 .

Есть идеи, как подойти к этому? Моя интуиция заключается в том, чтобы каким-то образом использовать window / lag для достижения этой цели, но я новичок в spark и не уверен, как это сделать…


Редактировать:

Я закончил использование варианта решения @ mck с небольшим исправлением ошибки: оригинальное решение имеет:

 F.max(F.col('begin')).over(w.rowsBetween(0, Window.unboundedFollowing))
 

условие, которое в конечном итоге помечает все события после ‘begin’, независимо от того, выполняются условия ‘count’ или нет. Вместо этого я изменил решение так, чтобы в окне отмечались только события, которые произошли до ‘begin’:

 event = (f.max(f.col('begin')).over(w.rowsBetween(-2, 0))). 
          alias('event_post_only') 
# the number of events to mark is 3 from 'begin', 
# including the event itself, so that's -2.
df_marked_events = df_marked_events.select('*', event)
 

Затем отметьте True для всех событий, которые были True в ‘event_post_only’ ИЛИ были True в ‘event_post_only’

 df_marked_events = df_marked_events.withColumn('event', (col('count') >= 3) 
                       | (col('event_post_only')))
 

Это позволяет избежать пометки True для всего, что находится выше по потоку, как ‘begin’ == True

Ответ №1:

 import pyspark.sql.functions as F
from pyspark.sql.window import Window

w = Window.partitionBy('uid').orderBy(F.col('count').desc(), F.col('start_date'))

# find the beginning point of >= 3 events
begin = (
    (F.col('count') >= 3) amp;
    (F.lead(F.col('count')).over(w) < 3)
).alias('begin')
df = df.select('*', begin)

# Mark as event if the event is in any rows after begin, or two rows before begin
event = (
    F.max(F.col('begin')).over(w.rowsBetween(0, Window.unboundedFollowing)) | 
    F.max(F.col('begin')).over(w.rowsBetween(-2,0))
).alias('event')
df = df.select('*', event)

df.show()
 ------------------- ----- ----- ----- ----- 
|         start_date|  uid|count|begin|event|
 ------------------- ----- ----- ----- ----- 
|2020-11-26 08:30:22|user1|  4.0|false| true|
|2020-11-22 08:37:18|user1|  3.0|false| true|
|2020-11-26 10:00:00|user1|  3.0| true| true|
|2020-11-20 16:04:04|user1|  2.0|false| true|
|2020-11-22 13:32:30|user1|  2.0|false| true|
|2020-11-16 12:04:04|user1|  1.0|false|false|
 ------------------- ----- ----- ----- ----- 
 

Комментарии:

1. Спасибо!! Я думаю, что это подходит мне почти идеально. Мне потребовалось некоторое время, чтобы понять, почему и как это работает. Большое вам спасибо!

2. извините за отсутствие документации. надеюсь, это поможет!

3. Я добавил несколько кратких комментариев, надеюсь, это поможет другим пользователям