#python #dataframe #apache-spark #pyspark #group-by
Вопрос:
У меня есть фрейм данных PySpark, и я хотел бы получить второе по величине значение ORDERED_TIME
( yyyy-mm-dd
формат поля даты и времени) после группы, примененной к 2 столбцам, а именно CUSTOMER_ID
и ADDRESS_ID
.
У клиента может быть много заказов, связанных с адресом, и я хотел бы получить второй по времени заказ на пару (customer,address)
Мой подход состоял в том , чтобы сделать окно и раздел в соответствии с CUSTOMER_ID
и ADDRESS_ID
отсортировать по ORDERED_TIME
sorted_order_times = Window.partitionBy("CUSTOMER_ID", "ADDRESS_ID").orderBy(col('ORDERED_TIME').desc())
df2 = df2.withColumn("second_recent_order", (df2.select("ORDERED_TIME").collect()[1]).over(sorted_order_times))
Однако я получаю сообщение об ошибке ValueError: 'over' is not in list
Может ли кто-нибудь предложить правильный способ решения этой проблемы?
Пожалуйста, дайте мне знать, если потребуется какая-либо другая информация
Примеры Данных
----------- ---------- -------------------
|USER_ID |ADDRESS_ID| ORDER DATE |
----------- ---------- -------------------
| 100| 1000 |2021-01-02 |
| 100| 1000 |2021-01-14 |
| 100| 1000 |2021-01-03 |
| 100| 1000 |2021-01-04 |
| 101| 2000 |2020-05-07 |
| 101| 2000 |2021-04-14 |
----------- ---------- -------------------
Ожидаемый Результат
----------- ---------- ------------------- -------------------
|USER_ID |ADDRESS_ID| ORDER DATE |second_recent_order
----------- ---------- ------------------- -------------------
| 100| 1000 |2021-01-02 |2021-01-04
| 100| 1000 |2021-01-14 |2021-01-04
| 100| 1000 |2021-01-03 |2021-01-04
| 100| 1000 |2021-01-04 |2021-01-04
| 101| 2000 |2020-05-07 |2020-05-07
| 101| 2000 |2021-04-14 |2020-05-07
----------- ---------- ------------------- -------------------
Комментарии:
1. Не могли бы вы предоставить некоторые примеры данных вместе с ожидаемым результатом?
2. Извините, что упустил это, я отредактировал вопрос с помощью примеров ввода и вывода
Ответ №1:
Вот еще один способ сделать это. С помощью collect_list
import pyspark.sql.functions as F
from pyspark.sql import Window
sorted_order_times = Window.partitionBy("CUSTOMER_ID", "ADDRESS_ID").orderBy(F.col('ORDERED_TIME').desc()).rangeBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df2 = (
df
.withColumn("second_recent_order", (F.collect_list(F.col("ORDERED_TIME")).over(sorted_order_times))[1])
)
df2.show()
Ответ №2:
Вы можете использовать окно здесь следующим образом, но вы получите значение null, если в группе будет только одна строка
sorted_order_times = Window.partitionBy("CUSTOMER_ID", "ADDRESS_ID").orderBy(desc('ORDERED_TIME')).rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df2 = df2.withColumn(
"second_recent_order",
collect_list("ORDERED_TIME").over(sorted_order_times).getItem(1)
)
Ответ №3:
Одним из решений является создание таблицы поиска со вторыми последними заказами для всех пар CUSTOMER_ID
и ADDRESS_ID
, а затем объединение ее с исходным фреймом данных.
Я предполагаю, что ваша ORDERED_TIME
колонка уже имеет тип метки времени.
import pyspark.sql.functions as F
from pyspark.sql.window import Window
# define window
w = Window().partitionBy('CUSTOMER_ID', 'ADDRESS_ID').orderBy(F.desc('ORDERED_TIME'))
# create lookup table
second_highest = df
.withColumn('rank', F.dense_rank().over(w))
.filter(F.col('rank') == 2)
.select('CUSTOMER_ID', 'ADDRESS_ID', 'ORDERED_TIME')
# join with original dataframe
df = df.join(second_highest, on=['CUSTOMER_ID', 'ADDRESS_ID'], how='left')
df.show()
----------- ---------- ------------------- -------------------
|CUSTOMER_ID|ADDRESS_ID| ORDERED_TIME| ORDERED_TIME|
----------- ---------- ------------------- -------------------
| 100| 158932441|2021-01-02 13:35:57|2021-01-04 09:36:10|
| 100| 158932441|2021-01-14 19:14:08|2021-01-04 09:36:10|
| 100| 158932441|2021-01-03 13:33:52|2021-01-04 09:36:10|
| 100| 158932441|2021-01-04 09:36:10|2021-01-04 09:36:10|
| 101| 281838494|2020-05-07 13:35:57|2020-05-07 13:35:57|
| 101| 281838494|2021-04-14 19:14:08|2020-05-07 13:35:57|
----------- ---------- ------------------- -------------------
Примечание: в вашем ожидаемом выпуске вы написали 2021-04-14 19:14:08
для CUSTOMER_ID == 101
, но на самом деле это 2020-05-07 13:35:57
потому, что это 2020 год.
Ответ №4:
Можно использовать два окна: отсортированное для получения строк в правильном порядке и несортированное в сочетании с функцией «первый» для получения второй строки (Scala):
val df2 = Seq(
(100, 158932441, "2021-01-02 13:35:57"),
(100, 158932441, "2021-01-14 19:14:08"),
(100, 158932441, "2021-01-03 13:33:52"),
(100, 158932441, "2021-01-04 09:36:10"),
(101, 281838494, "2020-05-07 13:35:57"),
(101, 281838494, "2021-04-14 19:14:08")
).toDF("CUSTOMER_ID", "ADDRESS_ID", "ORDERED_TIME")
val sorted_order_times = Window
.partitionBy("CUSTOMER_ID", "ADDRESS_ID")
.orderBy(desc("ORDERED_TIME"))
val unsorted_order_times = Window
.partitionBy("CUSTOMER_ID", "ADDRESS_ID")
df2
.withColumn("row_number", row_number().over(sorted_order_times))
.withColumn("second_recent_order",
first(
when($"row_number" === lit(2), $"ORDERED_TIME").otherwise(null), true
).over(unsorted_order_times))
.drop("row_number")
Выход:
----------- ---------- ------------------- -------------------
|CUSTOMER_ID|ADDRESS_ID|ORDERED_TIME |second_recent_order|
----------- ---------- ------------------- -------------------
|100 |158932441 |2021-01-14 19:14:08|2021-01-04 09:36:10|
|100 |158932441 |2021-01-04 09:36:10|2021-01-04 09:36:10|
|100 |158932441 |2021-01-03 13:33:52|2021-01-04 09:36:10|
|100 |158932441 |2021-01-02 13:35:57|2021-01-04 09:36:10|
|101 |281838494 |2021-04-14 19:14:08|2020-05-07 13:35:57|
|101 |281838494 |2020-05-07 13:35:57|2020-05-07 13:35:57|
----------- ---------- ------------------- -------------------