Для каждого дня x, для каждого пропущенного уникального значения y столбца z создайте строку с датой=x и z=самое последнее значение y

#sql #apache-spark #pyspark #apache-spark-sql

Вопрос:

У меня есть фрейм данных pyspark, в котором указано, на каком Date элементе было что Stock в конкретном Size . Составной ключ таблицы — [Дата, размер]. Учитывая, что элемент имеет n размеры, для каждой даты n может существовать от 0 до строк.

 input = spark.createDataFrame([
    # Day 1: Row for all sizes
    [1, 1, 10],
    [1, 2, 10],
    [1, 3, 10],
    # Day 2: Row for one size
    [2, 1, 8],
    # Day 3: Row for no size
    # Day 4: Row for two sizes
    [4, 1, 7],
    [4, 2, 9],
], ["Date", "Size", "Stock"])
 

Например, на второй день в целом было продано две штуки товара размером 1, что сократило его запас для этого размера с 10 до 8. В этот день не было транзакций для размера 1 или 3.

Я хотел бы вычислить Stock , какой элемент был у каждого из них Sizes на каждом Date . Ожидаемый результат выглядит следующим образом:

 expected = spark.createDataFrame([
    # Day 1
    [1, 1, 10],
    [1, 2, 10],
    [1, 3, 10],
    # Day 2
    [2, 1, 8],
    [2, 2, 10],
    [2, 3, 10],
    # Day 3
    [3, 1, 8],
    [3, 2, 10],
    [3, 3, 10],
    # Day 4
    [4, 1, 7],
    [4, 2, 9],
    [4, 3, 10],
], ["Date", "Size", "Stock"])
 

Как я мог этого достичь?

Ответ №1:

Идея состоит в том, чтобы сгенерировать последовательность дат из минимальной и максимальной даты, перекрестное соединение, чтобы получить список комбинаций даты и размера, левое соединение с исходным кадром данных и получить предыдущую стоимость акций, используя значение last со ignoreNulls значением True .

 from pyspark.sql import functions as F, Window

df = input.agg(F.expr('sequence(min(Date), max(Date)) as Date')).select(F.explode('Date').alias('Date'))

result = df.crossJoin(
    input.select('Size').distinct().repartition(10)
).join(
    input, 
    ['Date', 'Size'], 
    'left'
).withColumn(
    'Stock', 
    F.last('Stock', True).over(Window.partitionBy('Size').orderBy('Date'))
)

result.orderBy('Date', 'Size').show()
 ---- ---- ----- 
|Date|Size|Stock|
 ---- ---- ----- 
|   1|   1|   10|
|   1|   2|   10|
|   1|   3|   10|
|   2|   1|    8|
|   2|   2|   10|
|   2|   3|   10|
|   3|   1|    8|
|   3|   2|   10|
|   3|   3|   10|
|   4|   1|    7|
|   4|   2|    9|
|   4|   3|   10|
 ---- ---- -----