#apache-spark #pyspark #apache-spark-sql #rolling-computation
#apache-spark #pyspark #apache-spark-sql #переходное вычисление
Вопрос:
У меня есть фрейм данных Spark, например, так:
# For sake of simplicity only one user (uid) is shown, but there are multiple users
------------------- ----- -------
|start_date |uid |count |
------------------- ----- -------
|2020-11-26 08:30:22|user1| 4 |
|2020-11-26 10:00:00|user1| 3 |
|2020-11-22 08:37:18|user1| 3 |
|2020-11-22 13:32:30|user1| 2 |
|2020-11-20 16:04:04|user1| 2 |
|2020-11-16 12:04:04|user1| 1 |
Я хочу создать новый логический столбец, значения которого равны True / False, если у пользователя было хотя бы count >= x событий в прошлом, и пометить эти события значением True. Например, для x = 3 я ожидаю получить:
------------------- ----- ------- --------------
|start_date |uid |count | marked_event |
------------------- ----- ------- --------------
|2020-11-26 08:30:22|user1| 4 | True |
|2020-11-26 10:00:00|user1| 3 | True |
|2020-11-22 08:37:18|user1| 3 | True |
|2020-11-22 13:32:30|user1| 2 | True |
|2020-11-20 16:04:04|user1| 2 | True |
|2020-11-16 12:04:04|user1| 1 | False |
То есть для каждого количества> = 3 мне нужно пометить это событие значением True, а также предыдущие 3 события. Только последнее событие user1 является ложным, потому что я отмечаю 3 события до (и включая) событие в start_date = 2020-11-22 08:37:18 .
Есть идеи, как подойти к этому? Моя интуиция заключается в том, чтобы каким-то образом использовать window / lag для достижения этой цели, но я новичок в spark и не уверен, как это сделать…
Редактировать:
Я закончил использование варианта решения @ mck с небольшим исправлением ошибки: оригинальное решение имеет:
F.max(F.col('begin')).over(w.rowsBetween(0, Window.unboundedFollowing))
условие, которое в конечном итоге помечает все события после ‘begin’, независимо от того, выполняются условия ‘count’ или нет. Вместо этого я изменил решение так, чтобы в окне отмечались только события, которые произошли до ‘begin’:
event = (f.max(f.col('begin')).over(w.rowsBetween(-2, 0))).
alias('event_post_only')
# the number of events to mark is 3 from 'begin',
# including the event itself, so that's -2.
df_marked_events = df_marked_events.select('*', event)
Затем отметьте True для всех событий, которые были True в ‘event_post_only’ ИЛИ были True в ‘event_post_only’
df_marked_events = df_marked_events.withColumn('event', (col('count') >= 3)
| (col('event_post_only')))
Это позволяет избежать пометки True для всего, что находится выше по потоку, как ‘begin’ == True
Ответ №1:
import pyspark.sql.functions as F
from pyspark.sql.window import Window
w = Window.partitionBy('uid').orderBy(F.col('count').desc(), F.col('start_date'))
# find the beginning point of >= 3 events
begin = (
(F.col('count') >= 3) amp;
(F.lead(F.col('count')).over(w) < 3)
).alias('begin')
df = df.select('*', begin)
# Mark as event if the event is in any rows after begin, or two rows before begin
event = (
F.max(F.col('begin')).over(w.rowsBetween(0, Window.unboundedFollowing)) |
F.max(F.col('begin')).over(w.rowsBetween(-2,0))
).alias('event')
df = df.select('*', event)
df.show()
------------------- ----- ----- ----- -----
| start_date| uid|count|begin|event|
------------------- ----- ----- ----- -----
|2020-11-26 08:30:22|user1| 4.0|false| true|
|2020-11-22 08:37:18|user1| 3.0|false| true|
|2020-11-26 10:00:00|user1| 3.0| true| true|
|2020-11-20 16:04:04|user1| 2.0|false| true|
|2020-11-22 13:32:30|user1| 2.0|false| true|
|2020-11-16 12:04:04|user1| 1.0|false|false|
------------------- ----- ----- ----- -----
Комментарии:
1. Спасибо!! Я думаю, что это подходит мне почти идеально. Мне потребовалось некоторое время, чтобы понять, почему и как это работает. Большое вам спасибо!
2. извините за отсутствие документации. надеюсь, это поможет!
3. Я добавил несколько кратких комментариев, надеюсь, это поможет другим пользователям