#pyspark #apache-spark-sql
Вопрос:
У меня есть два фрейма данных Spark, первый из которых содержит информацию Events
, как показано ниже:
ID | Идентификатор пользователя | Дата |
---|---|---|
1 | 1 | 2020-12-01 |
2 | 2 | 2021-10-10 |
Второй кадр данных содержит информацию, относящуюся к Purchase
следующему:
ID | Идентификатор пользователя | Дата | Ценность |
---|---|---|---|
1 | 1 | 2020-11-10 | 50 |
2 | 1 | 2020-10-10 | 25 |
3 | 2 | 2020-09-15 | 100 |
Я хочу объединить оба фрейма данных и создать столбец, содержащий последнее значение, еще один столбец с разницей между двумя прошлыми значениями и разницей в днях между двумя прошлыми покупками, как показано ниже:
ID | Идентификатор пользователя | Дата | Последнее значение | Различное значение | Diff_Date |
---|---|---|---|---|---|
1 | 1 | 2020-12-01 | 50 | 25 | 30 |
2 | 2 | 2021-10-10 | 100 | нулевой | нулевой |
Чтобы присоединиться к кадрам данных, я использую следующий код:
(Events.join(Purchase,
on = [Events.User_id == Purchase.User_id,
Events.Date >= Purchase.Date],
how = "left")
.withColumn('rank_date', F.rank().over(W.partitionBy(Events['Id']).orderBy(Purchase['Data'].desc())))
С помощью этого кода я могу видеть, какие покупки до события заказаны по дате, но как я могу получить доступ к значениям строк и создать столбцы на основе этих значений?
Комментарии:
1. Даты, указанные в
events
, не связаны с датами, указанными вpurchase
? Я бы подумал, что «последнее событие покупки идентификатора пользователя 1 произошло в 2020-12-01 по цене 50».2. Объединенный фрейм данных содержит Дату событий. В соединении должна содержаться только покупка, совершенная до Даты Событий, в Случае пользователя 1 2020-12-01 последняя совершенная покупка была совершена 2020-11-10 со значением 50. Но я указываю Дату покупки, когда соединяю оба кадра данных
Ответ №1:
Я думаю, что так проще поступить:
- работа с фреймом данных о покупке
- объединение / фильтрация с помощью фрейма данных событий
Затем это можно сделать следующим образом:
window_user_id = Window.partitionBy('user_id')
(
purchase
.withColumn('purchase_rank', F.rank().over(window_user_id.orderBy(F.col('date').asc())))
.withColumn('previous_value', F.lag('value', 1).over(window_user_id.orderBy(F.col('date'))))
.withColumn("diff_value", F.when(F.isnull(F.col("value") - F.col("previous_value")), 0).otherwise(F.col("value") - F.col("previous_value")))
.withColumn('diff_days', F.datediff('date', F.lag('date', 1).over(window_user_id.orderBy(F.col('date')))))
.drop("previous_value")
.show()
)
--- ------- ---------- ----- ------------- ---------- ---------
| id|user_id| date|value|purchase_rank|diff_value|diff_days|
--- ------- ---------- ----- ------------- ---------- ---------
| 2| 1|2020-10-10| 25| 1| 0| null|
| 1| 1|2020-11-10| 50| 2| 25| 31|
| 3| 2|2020-09-15| 100| 1| 0| null|
--- ------- ---------- ----- ------------- ---------- ---------
Отсюда проще объединять / фильтровать любые другие данные.
Чтобы сохранить только последние покупки каждого идентификатора пользователя:
(
purchase
.withColumn('purchase_rank', F.rank().over(window_user_id.orderBy(F.col('date').asc())))
.withColumn('previous_value', F.lag('value', 1).over(window_user_id.orderBy(F.col('date'))))
.withColumn("diff_value", F.when(F.isnull(F.col("value") - F.col("previous_value")), 0).otherwise(F.col("value") - F.col("previous_value")))
.withColumn('diff_days', F.datediff('date', F.lag('date', 1).over(window_user_id.orderBy(F.col('date')))))
.withColumn("last_purchase", F.last("purchase_rank").over(window_user_id))
.filter(F.col("purchase_rank") == F.col("last_purchase"))
.drop("previous_value", "purchase_rank", "last_purchase")
.show()
)
--- ------- ---------- ----- ---------- ---------
| id|user_id| date|value|diff_value|diff_days|
--- ------- ---------- ----- ---------- ---------
| 1| 1|2020-11-10| 50| 25| 31|
| 3| 2|2020-09-15| 100| 0| null|
--- ------- ---------- ----- ---------- ---------
Ответ №2:
Вот как я ответил на вопрос, работая с фреймом данных событий вместо работы с фреймом данных покупок:
(Events.join(Purchase,
on = [Events.User_id == Purchase.User_id,
Events.Date >= Purchase.Date],
how = "left")
.withColumn('rank_date', F.row_number().over(W.partitionBy(Events['Id']).orderBy(Purchase['Date'].desc())))
.withColumn('LastValue', when(F.col('rank_date') == 1, F.col('Value')).otherwise(F.lag('Value', 1).over(W.partitionBy(Events['Id']).orderBy(Purchase['Date'].desc()))))
.withColumn('LastDate', when(F.col('rank_date') == 1, Purchase['Date']).otherwise(F.lag(Purchase['Date'], 1).over(W.partitionBy(Events['Id']).orderBy(Purchase['Date'].desc()))))
.withColumn('totalPurchase', F.max('rank_date').over(W.partitionBy(Events['Id'])))
.withColumn('DiffValue', when(F.col('totalPurchase') == 1, 0).otherwise(F.col('LastValue') - F.col('Value')))
.withColumn('DiffDays', when(F.col('totalPurchase') == 1, None).otherwise(F.datediff(F.col('LastDate'), Purchase['Date'])))
.withColumn('keepRow', when(F.col('totalPurchase') == 1, 1).otherwise(when(F.col('rank_date') == 2, 1).otherwise(0)))
.filter(F.col('keepRow') == 1)
.drop('rank_date', 'LastDate', 'totalPurchase', 'keepRow'))