PySpark: как сгруппировать по фиксированному диапазону дат и другому столбцу, вычисляющему сумму столбца значений, используя оконные функции?

#python #apache-spark #pyspark #apache-spark-sql #pyspark-sql

#python #apache-spark #pyspark #apache-spark-sql

Вопрос:

У меня есть фрейм данных Spark, состоящий из трех столбцов: Date , Item и Value типов Date , String и Double соответственно. Я хотел бы сгруппировать по диапазону дат (где длительность каждого диапазона составляет 7 дней, начиная с первой даты в dataframe и выше) и элементу, и вычислить суммы значений для каждой такой группы, определяемой диапазоном дат (фактически номером недели) и элементом.

Я подозреваю, что оконные функции PySpark должны быть использованы в какой-то момент здесь для диапазонов дат, но не могу понять, как их реализовать в этом случае.

Ответ №1:

Давайте сначала определим подход для этого —

(a) Добавьте столбец week_start_date для строки (каждой даты)

(b) Используйте столбец week_start_date в group by (вместе с ‘item’) и вычислите сумму «value»

Сгенерируйте некоторые тестовые данные

 from pyspark.sql.types import *

schema = StructType([StructField('date', StringType(),True),
                     StructField('item', StringType(),True),
                     StructField('value', DoubleType(),True)
    ]
    )

data = [('2019-01-01','I1',1.1),
        ('2019-01-02','I1',1.1),
        ('2019-01-10','I1',1.1),
        ('2019-01-10','I2',1.1),
        ('2019-01-11','I2',1.1),
        ('2019-01-11','I3',1.1)]

df = spark.createDataFrame(data, schema)
  

Функция Python для генерации week_start_date

 from datetime import datetime, timedelta

def week_start_date(day):
    dt = datetime.strptime(day, '%Y-%m-%d')
    start = dt - timedelta(days=dt.weekday())
    end = start   timedelta(days=6)
    return start.strftime('%Y-%m-%d')

spark.udf.register('week_start_date',week_start_date)
  

Используйте функцию для генерации week_start_date, а затем сгруппируйте по week_start_date и элементу

  df.selectExpr("week_start_date(date) as start_date","date","item as item","value as value" ).
        groupBy("start_date","item").
        agg(sum('value').alias('value_sum')).
        orderBy("start_date").
        show()