Разверните фрейм данных spark и вычислите сумму из 2 строк выше

#pyspark

Вопрос:

  -------------------- ------------------- ------------ 
|            name    |               1990|1991        |
 -------------------- ------------------- ------------ 
|abc                 |100                |        300 |
|bbc                 |200                |        400 |
|cbc                 |300                |        500 |
|xyz                 |700                |        500 |
|xzz                 |200                |        500 |
 

Выше приведен фрейм данных в pyspark. Я хотел бы преобразовать это в сводную таблицу и рассчитать скользящее суммирование. Есть ли способ преобразовать столбцы в строки по имени? Это ожидаемый результат, который в основном является суммированием предыдущих 2 лет. Как мы можем этого достичь?

  -------------------- ------------------- ------------ 
|            year    |               abc |bbc         |
 -------------------- ------------------- ------------ 
|1990                 |100               |        300 |
|1991                 |100 200           |    400 300 |
 

Комментарии:

1. ваш пример вывода выглядит неверно? основываясь на вашем вводе, bbc 1990 = 200 , bbc 1991 = 400 , тогда я подумал, что этого следует ожидать: pivoted bbc 1990 = 200 , pivoted bbc 1991 = 200 400 = 600 ?

Ответ №1:

Вы можете разделить задачу на две части

  • сначала создайте сводную таблицу, а затем
  • используйте окно для расчета суммы
 from pyspark.sql import types as T
from pyspark.sql import Window

df = ...

w=Window.orderBy("year").rowsBetween(-1,0)
df.withColumnRenamed("1990", "c1990").withColumnRenamed("1991", "c1991")
    .selectExpr("name", "stack(2, '1990', c1990, '1991', c1991)") 
    .groupBy("col0") 
    .pivot("name", ["abc", "bbc"]) 
    .agg(F.first("col1")) 
    .withColumnRenamed("col0", "year") 
    .withColumn("abc_sum", F.sum("abc").over(w)) 
    .withColumn("bbc_sum", F.sum("bbc").over(w)) 
    .show()
 

Выход:

  ---- --- --- ------- ------- 
|year|abc|bbc|abc_sum|bbc_sum|
 ---- --- --- ------- ------- 
|1990|100|200|    100|    200|
|1991|300|400|    400|    600|
 ---- --- --- ------- ------- 
 

Изменить: использование всех значений из name столбца:

 df2=df.withColumnRenamed("1990", "c1990").withColumnRenamed("1991", "c1991")
    .selectExpr("name", "stack(2, '1990', c1990, '1991', c1991)") 
    .groupBy("col0") 
    .pivot("name") 
    .agg(F.first("col1")) 
    .withColumnRenamed("col0", "year")

cols =[F.col("year")]   [F.sum(col).over(w).alias(col) for col in df2.columns[1:]]
df2.select(cols).show()
 

Комментарии:

1. Что было бы, если бы мы не знали, сколько имен у нас в столбце «Имя»? Можем ли мы динамически выбирать столбцы, когда мы не уверены в том, сколько их там? В приведенном выше коде мы можем изменить это на чтение столбцов, когда размер неизвестен?

2. @ZZzzZZzz после создания сводной таблицы можно получить результирующие имена столбцов, а затем суммировать по всем столбцам. Я добавил код

3. Спасибо. Он действительно запускается, но выдает ошибку Py4JJavaError: При вызове произошла ошибка z:org.apache.spark.sql.functions.sum. : java.lang. Исключение StringIndexOutOfBoundsException: Строковый индекс вне диапазона: -1

4. Кроме того, сумма bbc_sum должна составлять 100 200 за 1990 год, а за 1991 год-400 300. Похоже, результат неверен. Он должен суммировать на основе диапазона окон предыдущего столбца.

5. @ZZzzZZzz не уверен, понимаю ли я вашу требуемую логику… для BBC значение во входных данных равно 200 для 1990 года и 400 для 1991 года. Откуда берутся эти 100?