Как получить второе по величине значение из столбца pyspark?

#python #dataframe #apache-spark #pyspark #group-by

Вопрос:

У меня есть фрейм данных PySpark, и я хотел бы получить второе по величине значение ORDERED_TIME ( yyyy-mm-dd формат поля даты и времени) после группы, примененной к 2 столбцам, а именно CUSTOMER_ID и ADDRESS_ID .

У клиента может быть много заказов, связанных с адресом, и я хотел бы получить второй по времени заказ на пару (customer,address)

Мой подход состоял в том , чтобы сделать окно и раздел в соответствии с CUSTOMER_ID и ADDRESS_ID отсортировать по ORDERED_TIME

 sorted_order_times = Window.partitionBy("CUSTOMER_ID", "ADDRESS_ID").orderBy(col('ORDERED_TIME').desc())

df2 = df2.withColumn("second_recent_order", (df2.select("ORDERED_TIME").collect()[1]).over(sorted_order_times))
 

Однако я получаю сообщение об ошибке ValueError: 'over' is not in list

Может ли кто-нибудь предложить правильный способ решения этой проблемы?

Пожалуйста, дайте мне знать, если потребуется какая-либо другая информация

Примеры Данных

  ----------- ---------- ------------------- 
|USER_ID    |ADDRESS_ID|       ORDER DATE  | 
 ----------- ---------- ------------------- 
|        100| 1000     |2021-01-02         |
|        100| 1000     |2021-01-14         |
|        100| 1000     |2021-01-03         |
|        100| 1000     |2021-01-04         |
|        101| 2000     |2020-05-07         |
|        101| 2000     |2021-04-14         |
 ----------- ---------- ------------------- 
 

Ожидаемый Результат

  ----------- ---------- ------------------- ------------------- 
|USER_ID    |ADDRESS_ID|       ORDER DATE  |second_recent_order
 ----------- ---------- ------------------- ------------------- 
|        100| 1000     |2021-01-02          |2021-01-04 
|        100| 1000     |2021-01-14          |2021-01-04 
|        100| 1000     |2021-01-03          |2021-01-04 
|        100| 1000     |2021-01-04          |2021-01-04 
|        101| 2000     |2020-05-07          |2020-05-07 
|        101| 2000     |2021-04-14          |2020-05-07 
 ----------- ---------- ------------------- -------------------
 

Комментарии:

1. Не могли бы вы предоставить некоторые примеры данных вместе с ожидаемым результатом?

2. Извините, что упустил это, я отредактировал вопрос с помощью примеров ввода и вывода

Ответ №1:

Вот еще один способ сделать это. С помощью collect_list

 import pyspark.sql.functions as F
from pyspark.sql import Window


sorted_order_times = Window.partitionBy("CUSTOMER_ID", "ADDRESS_ID").orderBy(F.col('ORDERED_TIME').desc()).rangeBetween(Window.unboundedPreceding,  Window.unboundedFollowing)
df2 = (
  df
  .withColumn("second_recent_order", (F.collect_list(F.col("ORDERED_TIME")).over(sorted_order_times))[1])
)
df2.show()
 

Конечный результат

Ответ №2:

Вы можете использовать окно здесь следующим образом, но вы получите значение null, если в группе будет только одна строка

 
sorted_order_times = Window.partitionBy("CUSTOMER_ID", "ADDRESS_ID").orderBy(desc('ORDERED_TIME')).rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

df2 = df2.withColumn(
    "second_recent_order",
    collect_list("ORDERED_TIME").over(sorted_order_times).getItem(1)
)

 

Ответ №3:

Одним из решений является создание таблицы поиска со вторыми последними заказами для всех пар CUSTOMER_ID и ADDRESS_ID , а затем объединение ее с исходным фреймом данных.
Я предполагаю, что ваша ORDERED_TIME колонка уже имеет тип метки времени.

 import pyspark.sql.functions as F
from pyspark.sql.window import Window

# define window
w = Window().partitionBy('CUSTOMER_ID', 'ADDRESS_ID').orderBy(F.desc('ORDERED_TIME'))

# create lookup table
second_highest = df 
  .withColumn('rank', F.dense_rank().over(w)) 
  .filter(F.col('rank') == 2) 
  .select('CUSTOMER_ID', 'ADDRESS_ID', 'ORDERED_TIME')

# join with original dataframe
df = df.join(second_highest, on=['CUSTOMER_ID', 'ADDRESS_ID'], how='left')

df.show()

 ----------- ---------- ------------------- ------------------- 
|CUSTOMER_ID|ADDRESS_ID|       ORDERED_TIME|       ORDERED_TIME|
 ----------- ---------- ------------------- ------------------- 
|        100| 158932441|2021-01-02 13:35:57|2021-01-04 09:36:10|
|        100| 158932441|2021-01-14 19:14:08|2021-01-04 09:36:10|
|        100| 158932441|2021-01-03 13:33:52|2021-01-04 09:36:10|
|        100| 158932441|2021-01-04 09:36:10|2021-01-04 09:36:10|
|        101| 281838494|2020-05-07 13:35:57|2020-05-07 13:35:57|
|        101| 281838494|2021-04-14 19:14:08|2020-05-07 13:35:57|
 ----------- ---------- ------------------- ------------------- 
 

Примечание: в вашем ожидаемом выпуске вы написали 2021-04-14 19:14:08 для CUSTOMER_ID == 101 , но на самом деле это 2020-05-07 13:35:57 потому, что это 2020 год.

Ответ №4:

Можно использовать два окна: отсортированное для получения строк в правильном порядке и несортированное в сочетании с функцией «первый» для получения второй строки (Scala):

 val df2 = Seq(
  (100, 158932441, "2021-01-02 13:35:57"),
  (100, 158932441, "2021-01-14 19:14:08"),
  (100, 158932441, "2021-01-03 13:33:52"),
  (100, 158932441, "2021-01-04 09:36:10"),
  (101, 281838494, "2020-05-07 13:35:57"),
  (101, 281838494, "2021-04-14 19:14:08")
).toDF("CUSTOMER_ID", "ADDRESS_ID", "ORDERED_TIME")

val sorted_order_times = Window
  .partitionBy("CUSTOMER_ID", "ADDRESS_ID")
  .orderBy(desc("ORDERED_TIME"))

val unsorted_order_times = Window
  .partitionBy("CUSTOMER_ID", "ADDRESS_ID")

df2
  .withColumn("row_number", row_number().over(sorted_order_times))
  .withColumn("second_recent_order",
  first(
    when($"row_number" === lit(2), $"ORDERED_TIME").otherwise(null), true
  ).over(unsorted_order_times))
  .drop("row_number")
 

Выход:

  ----------- ---------- ------------------- ------------------- 
|CUSTOMER_ID|ADDRESS_ID|ORDERED_TIME       |second_recent_order|
 ----------- ---------- ------------------- ------------------- 
|100        |158932441 |2021-01-14 19:14:08|2021-01-04 09:36:10|
|100        |158932441 |2021-01-04 09:36:10|2021-01-04 09:36:10|
|100        |158932441 |2021-01-03 13:33:52|2021-01-04 09:36:10|
|100        |158932441 |2021-01-02 13:35:57|2021-01-04 09:36:10|
|101        |281838494 |2021-04-14 19:14:08|2020-05-07 13:35:57|
|101        |281838494 |2020-05-07 13:35:57|2020-05-07 13:35:57|
 ----------- ---------- ------------------- -------------------