pyspark кумулятивный счетчик различен

#pyspark

#pyspark

Вопрос:

Я хочу производить ежедневное совокупное количество уникальных посетителей веб-сайта, а встроенная функция pyspark countDistinct не работает внутри движущегося / растущего окна

Для следующих данных:

  --- ---- 
|day|user|
 --- ---- 
|  1|   A|
|  2|   B|
|  3|   A|
|  4|   C|
|  5|   C|
|  5|   B|
 --- ---- 
 

ожидал бы результат:

  --- --------- 
|day|cum_count|
 --- --------- 
|  1|        1| -> [A]
|  2|        2| -> [A,B]
|  3|        2| -> [A,B]
|  4|        3| -> [A,B,C]
|  5|        3| -> [A,B,C]
 --- --------- 
 

PS: исходные данные огромны и не могут быть сброшены в pandas

Ответ №1:

Более короткое решение, которое собирает уникальных пользователей с помощью collect_set и извлекает его длину с помощью size :

 import pyspark.sql.functions as F
from pyspark.sql.window import Window

cum_count = F.size(
    F.collect_set(F.col('user'))
     .over(Window.orderBy('day').rowsBetween(Window.unboundedPreceding, 0))
).alias('cum_count')

df.select('day', cum_count)
df.show()
    
 --- --------- 
|day|cum_count|
 --- --------- 
|  1|        1|
|  2|        2|
|  3|        2|
|  4|        3|
|  5|        3|
|  5|        3|
 --- --------- 
 

Комментарии:

1. Это хорошее решение, но оно не работает для больших наборов данных

2. @Maviles Ну, вы не указали, что работаете с большим набором данных в своем вопросе, когда я отвечал на него

Ответ №2:

Я мог бы добиться этого результата, преобразовав проблему из cumulative count distinct в cumulative sum. Проблема заключалась в том, чтобы сохранить только первое посещение пользователя за период.

 import pyspark.sql.functions as F
from pyspark.sql.window import Window

#example dataset
>>> data = sqlContext.createDataFrame([[1,'A'],[2,'B'],[3,'A'],[4,'C'],[5,'C'],[5,'B']],schema=['day','user'])
>>> data.show()
 --- ---- 
|day|user|
 --- ---- 
|  1|   A|
|  2|   B|
|  3|   A|
|  4|   C|
|  5|   C|
|  5|   B|
 --- ---- 

#enumerate each user visit
>>> data = data.withColumn('user_visit',F.row_number().over(Window.partitionBy('user').orderBy('day')))
>>> data.orderBy('user','day').show()
 --- ---- ---------- 
|day|user|user_visit|
 --- ---- ---------- 
|  1|   A|         1|
|  3|   A|         2|
|  2|   B|         1|
|  5|   B|         2|
|  4|   C|         1|
|  5|   C|         2|
 --- ---- ---------- 

#Keep just the first visit
>>> data = data.withColumn('first_visit',F.when(F.col('user_visit') == 1,1))
>>> data.orderBy('day','user').show()
 --- ---- ---------- ----------- 
|day|user|user_visit|first_visit|
 --- ---- ---------- ----------- 
|  1|   A|         1|          1|
|  2|   B|         1|          1|
|  3|   A|         2|       null|
|  4|   C|         1|          1|
|  5|   B|         2|       null|
|  5|   C|         2|       null|
 --- ---- ---------- ----------- 

# cumulative sum of first visits
>>> w = Window.partitionBy().orderBy('day').rangeBetween(Window.unboundedPreceding,0)
>>> data = data.withColumn('cum_count',F.sum('first_visit').over(w))
>>> data.orderBy('day','user').show()
 --- ---- ---------- ----------- --------- 
|day|user|user_visit|first_visit|cum_count|
 --- ---- ---------- ----------- --------- 
|  1|   A|         1|          1|        1|
|  2|   B|         1|          1|        2|
|  3|   A|         2|       null|        2|
|  4|   C|         1|          1|        3|
|  5|   B|         2|       null|        3|
|  5|   C|         2|       null|        3|
 --- ---- ---------- ----------- --------- 

#aditional step to get the day total without duplicates
>>> data.groupBy('day').agg(F.max('cum_count')).show()
 --- -------------- 
|day|max(cum_count)|
 --- -------------- 
|  1|             1|
|  2|             2|
|  3|             2|
|  4|             3|
|  5|             3|
 --- --------------