#pyspark
#pyspark
Вопрос:
Я хочу производить ежедневное совокупное количество уникальных посетителей веб-сайта, а встроенная функция pyspark countDistinct не работает внутри движущегося / растущего окна
Для следующих данных:
--- ----
|day|user|
--- ----
| 1| A|
| 2| B|
| 3| A|
| 4| C|
| 5| C|
| 5| B|
--- ----
ожидал бы результат:
--- ---------
|day|cum_count|
--- ---------
| 1| 1| -> [A]
| 2| 2| -> [A,B]
| 3| 2| -> [A,B]
| 4| 3| -> [A,B,C]
| 5| 3| -> [A,B,C]
--- ---------
PS: исходные данные огромны и не могут быть сброшены в pandas
Ответ №1:
Более короткое решение, которое собирает уникальных пользователей с помощью collect_set
и извлекает его длину с помощью size
:
import pyspark.sql.functions as F
from pyspark.sql.window import Window
cum_count = F.size(
F.collect_set(F.col('user'))
.over(Window.orderBy('day').rowsBetween(Window.unboundedPreceding, 0))
).alias('cum_count')
df.select('day', cum_count)
df.show()
--- ---------
|day|cum_count|
--- ---------
| 1| 1|
| 2| 2|
| 3| 2|
| 4| 3|
| 5| 3|
| 5| 3|
--- ---------
Комментарии:
1. Это хорошее решение, но оно не работает для больших наборов данных
2. @Maviles Ну, вы не указали, что работаете с большим набором данных в своем вопросе, когда я отвечал на него
Ответ №2:
Я мог бы добиться этого результата, преобразовав проблему из cumulative count distinct в cumulative sum. Проблема заключалась в том, чтобы сохранить только первое посещение пользователя за период.
import pyspark.sql.functions as F
from pyspark.sql.window import Window
#example dataset
>>> data = sqlContext.createDataFrame([[1,'A'],[2,'B'],[3,'A'],[4,'C'],[5,'C'],[5,'B']],schema=['day','user'])
>>> data.show()
--- ----
|day|user|
--- ----
| 1| A|
| 2| B|
| 3| A|
| 4| C|
| 5| C|
| 5| B|
--- ----
#enumerate each user visit
>>> data = data.withColumn('user_visit',F.row_number().over(Window.partitionBy('user').orderBy('day')))
>>> data.orderBy('user','day').show()
--- ---- ----------
|day|user|user_visit|
--- ---- ----------
| 1| A| 1|
| 3| A| 2|
| 2| B| 1|
| 5| B| 2|
| 4| C| 1|
| 5| C| 2|
--- ---- ----------
#Keep just the first visit
>>> data = data.withColumn('first_visit',F.when(F.col('user_visit') == 1,1))
>>> data.orderBy('day','user').show()
--- ---- ---------- -----------
|day|user|user_visit|first_visit|
--- ---- ---------- -----------
| 1| A| 1| 1|
| 2| B| 1| 1|
| 3| A| 2| null|
| 4| C| 1| 1|
| 5| B| 2| null|
| 5| C| 2| null|
--- ---- ---------- -----------
# cumulative sum of first visits
>>> w = Window.partitionBy().orderBy('day').rangeBetween(Window.unboundedPreceding,0)
>>> data = data.withColumn('cum_count',F.sum('first_visit').over(w))
>>> data.orderBy('day','user').show()
--- ---- ---------- ----------- ---------
|day|user|user_visit|first_visit|cum_count|
--- ---- ---------- ----------- ---------
| 1| A| 1| 1| 1|
| 2| B| 1| 1| 2|
| 3| A| 2| null| 2|
| 4| C| 1| 1| 3|
| 5| B| 2| null| 3|
| 5| C| 2| null| 3|
--- ---- ---------- ----------- ---------
#aditional step to get the day total without duplicates
>>> data.groupBy('day').agg(F.max('cum_count')).show()
--- --------------
|day|max(cum_count)|
--- --------------
| 1| 1|
| 2| 2|
| 3| 2|
| 4| 3|
| 5| 3|
--- --------------