#sql #apache-spark #pyspark #apache-spark-sql
Вопрос:
У меня есть фрейм данных pyspark, в котором указано, на каком Date
элементе было что Stock
в конкретном Size
. Составной ключ таблицы — [Дата, размер]. Учитывая, что элемент имеет n
размеры, для каждой даты n
может существовать от 0 до строк.
input = spark.createDataFrame([
# Day 1: Row for all sizes
[1, 1, 10],
[1, 2, 10],
[1, 3, 10],
# Day 2: Row for one size
[2, 1, 8],
# Day 3: Row for no size
# Day 4: Row for two sizes
[4, 1, 7],
[4, 2, 9],
], ["Date", "Size", "Stock"])
Например, на второй день в целом было продано две штуки товара размером 1, что сократило его запас для этого размера с 10 до 8. В этот день не было транзакций для размера 1 или 3.
Я хотел бы вычислить Stock
, какой элемент был у каждого из них Sizes
на каждом Date
. Ожидаемый результат выглядит следующим образом:
expected = spark.createDataFrame([
# Day 1
[1, 1, 10],
[1, 2, 10],
[1, 3, 10],
# Day 2
[2, 1, 8],
[2, 2, 10],
[2, 3, 10],
# Day 3
[3, 1, 8],
[3, 2, 10],
[3, 3, 10],
# Day 4
[4, 1, 7],
[4, 2, 9],
[4, 3, 10],
], ["Date", "Size", "Stock"])
Как я мог этого достичь?
Ответ №1:
Идея состоит в том, чтобы сгенерировать последовательность дат из минимальной и максимальной даты, перекрестное соединение, чтобы получить список комбинаций даты и размера, левое соединение с исходным кадром данных и получить предыдущую стоимость акций, используя значение last
со ignoreNulls
значением True
.
from pyspark.sql import functions as F, Window
df = input.agg(F.expr('sequence(min(Date), max(Date)) as Date')).select(F.explode('Date').alias('Date'))
result = df.crossJoin(
input.select('Size').distinct().repartition(10)
).join(
input,
['Date', 'Size'],
'left'
).withColumn(
'Stock',
F.last('Stock', True).over(Window.partitionBy('Size').orderBy('Date'))
)
result.orderBy('Date', 'Size').show()
---- ---- -----
|Date|Size|Stock|
---- ---- -----
| 1| 1| 10|
| 1| 2| 10|
| 1| 3| 10|
| 2| 1| 8|
| 2| 2| 10|
| 2| 3| 10|
| 3| 1| 8|
| 3| 2| 10|
| 3| 3| 10|
| 4| 1| 7|
| 4| 2| 9|
| 4| 3| 10|
---- ---- -----