PySpark как создать столбец на основе значений строк

#pyspark #apache-spark-sql

Вопрос:

У меня есть два фрейма данных Spark, первый из которых содержит информацию Events , как показано ниже:

ID Идентификатор пользователя Дата
1 1 2020-12-01
2 2 2021-10-10

Второй кадр данных содержит информацию, относящуюся к Purchase следующему:

ID Идентификатор пользователя Дата Ценность
1 1 2020-11-10 50
2 1 2020-10-10 25
3 2 2020-09-15 100

Я хочу объединить оба фрейма данных и создать столбец, содержащий последнее значение, еще один столбец с разницей между двумя прошлыми значениями и разницей в днях между двумя прошлыми покупками, как показано ниже:

ID Идентификатор пользователя Дата Последнее значение Различное значение Diff_Date
1 1 2020-12-01 50 25 30
2 2 2021-10-10 100 нулевой нулевой

Чтобы присоединиться к кадрам данных, я использую следующий код:

 (Events.join(Purchase,
         on = [Events.User_id == Purchase.User_id,
               Events.Date >= Purchase.Date],
         how = "left")
   .withColumn('rank_date', F.rank().over(W.partitionBy(Events['Id']).orderBy(Purchase['Data'].desc())))
 

С помощью этого кода я могу видеть, какие покупки до события заказаны по дате, но как я могу получить доступ к значениям строк и создать столбцы на основе этих значений?

Комментарии:

1. Даты, указанные в events , не связаны с датами, указанными в purchase ? Я бы подумал, что «последнее событие покупки идентификатора пользователя 1 произошло в 2020-12-01 по цене 50».

2. Объединенный фрейм данных содержит Дату событий. В соединении должна содержаться только покупка, совершенная до Даты Событий, в Случае пользователя 1 2020-12-01 последняя совершенная покупка была совершена 2020-11-10 со значением 50. Но я указываю Дату покупки, когда соединяю оба кадра данных

Ответ №1:

Я думаю, что так проще поступить:

  • работа с фреймом данных о покупке
  • объединение / фильтрация с помощью фрейма данных событий

Затем это можно сделать следующим образом:

 window_user_id = Window.partitionBy('user_id')

(
    purchase
    .withColumn('purchase_rank', F.rank().over(window_user_id.orderBy(F.col('date').asc())))
    .withColumn('previous_value',   F.lag('value', 1).over(window_user_id.orderBy(F.col('date'))))
    .withColumn("diff_value", F.when(F.isnull(F.col("value") - F.col("previous_value")), 0).otherwise(F.col("value") - F.col("previous_value")))
    .withColumn('diff_days', F.datediff('date', F.lag('date', 1).over(window_user_id.orderBy(F.col('date')))))
    .drop("previous_value")
    .show()
)

 --- ------- ---------- ----- ------------- ---------- --------- 
| id|user_id|      date|value|purchase_rank|diff_value|diff_days|
 --- ------- ---------- ----- ------------- ---------- --------- 
|  2|      1|2020-10-10|   25|            1|         0|     null|
|  1|      1|2020-11-10|   50|            2|        25|       31|
|  3|      2|2020-09-15|  100|            1|         0|     null|
 --- ------- ---------- ----- ------------- ---------- --------- 
 

Отсюда проще объединять / фильтровать любые другие данные.

Чтобы сохранить только последние покупки каждого идентификатора пользователя:

 (
    purchase
    .withColumn('purchase_rank', F.rank().over(window_user_id.orderBy(F.col('date').asc())))
    .withColumn('previous_value',   F.lag('value', 1).over(window_user_id.orderBy(F.col('date'))))
    .withColumn("diff_value", F.when(F.isnull(F.col("value") - F.col("previous_value")), 0).otherwise(F.col("value") - F.col("previous_value")))
    .withColumn('diff_days', F.datediff('date', F.lag('date', 1).over(window_user_id.orderBy(F.col('date')))))
    .withColumn("last_purchase", F.last("purchase_rank").over(window_user_id))
    .filter(F.col("purchase_rank") == F.col("last_purchase"))
    .drop("previous_value", "purchase_rank", "last_purchase")
    .show()
)

 --- ------- ---------- ----- ---------- --------- 
| id|user_id|      date|value|diff_value|diff_days|
 --- ------- ---------- ----- ---------- --------- 
|  1|      1|2020-11-10|   50|        25|       31|
|  3|      2|2020-09-15|  100|         0|     null|
 --- ------- ---------- ----- ---------- --------- 
 

Ответ №2:

Вот как я ответил на вопрос, работая с фреймом данных событий вместо работы с фреймом данных покупок:

 (Events.join(Purchase,
             on = [Events.User_id == Purchase.User_id,
                   Events.Date >= Purchase.Date],
             how = "left")
   .withColumn('rank_date', F.row_number().over(W.partitionBy(Events['Id']).orderBy(Purchase['Date'].desc())))
   .withColumn('LastValue', when(F.col('rank_date') == 1, F.col('Value')).otherwise(F.lag('Value', 1).over(W.partitionBy(Events['Id']).orderBy(Purchase['Date'].desc()))))
   .withColumn('LastDate', when(F.col('rank_date') == 1, Purchase['Date']).otherwise(F.lag(Purchase['Date'], 1).over(W.partitionBy(Events['Id']).orderBy(Purchase['Date'].desc()))))
   .withColumn('totalPurchase', F.max('rank_date').over(W.partitionBy(Events['Id'])))
   .withColumn('DiffValue', when(F.col('totalPurchase') == 1, 0).otherwise(F.col('LastValue') - F.col('Value')))
   .withColumn('DiffDays', when(F.col('totalPurchase') == 1, None).otherwise(F.datediff(F.col('LastDate'), Purchase['Date'])))
   .withColumn('keepRow', when(F.col('totalPurchase') == 1, 1).otherwise(when(F.col('rank_date') == 2, 1).otherwise(0)))
   .filter(F.col('keepRow') == 1)
   .drop('rank_date', 'LastDate', 'totalPurchase', 'keepRow'))