Оконные и агрегатные функции в Pyspark SQL/SQL

#sql #pyspark #apache-spark-sql

Вопрос:

После ответа @Vaebhav понял, что вопрос был задан неправильно. Поэтому отредактировал его с помощью своего фрагмента кода.

У меня есть следующая таблица

 from pyspark.sql.types import IntegerType,TimestampType,DoubleType

input_str = """
4219,2018-01-01 08:10:00,3.0,50.78,
4216,2018-01-02 08:01:00,5.0,100.84,
4217,2018-01-02 20:00:00,4.0,800.49,
4139,2018-01-03 11:05:00,1.0,400.0,
4170,2018-01-03 09:10:00,2.0,100.0,
4029,2018-01-06 09:06:00,6.0,300.55,
4029,2018-01-06 09:16:00,2.0,310.55,
4217,2018-01-06 09:36:00,5.0,307.55,
1139,2018-01-21 11:05:00,1.0,400.0,
2170,2018-01-21 09:10:00,2.0,100.0,
4218,2018-02-06 09:36:00,5.0,307.55,
4218,2018-02-06 09:36:00,5.0,307.55
""".split(",")

input_values = list(map(lambda x: x.strip() if x.strip() != '' else None, input_str))
cols = list(map(lambda x: x.strip() if x.strip() != 'null' else None, "customer_id,timestamp,quantity,price".split(',')))
n = len(input_values)
n_cols = 4
input_list = [tuple(input_values[i:i n_cols]) for i in range(0,n,n_cols)]
sparkDF = sqlContext.createDataFrame(input_list,cols)
sparkDF = sparkDF.withColumn('customer_id',F.col('customer_id').cast(IntegerType()))
                 .withColumn('timestamp',F.col('timestamp').cast(TimestampType()))
                 .withColumn('quantity',F.col('quantity').cast(IntegerType()))
                 .withColumn('price',F.col('price').cast(DoubleType()))

 

Я хочу рассчитать аггергейт следующим образом :

trxn_date unique_cust_visits next_7_day_visits next_30_day_visits
2018-01-01 1 7 9
2018-01-02 2 6 8
2018-01-03 2 4 6
2018-01-06 2 2 4
2018-01-21 2 2 3
2018-02-06 1 1 1

Где

  • trxn_date-это дата из столбца метки времени,
  • daily_cust_visits-это уникальное количество клиентов,
  • next_7_day_visits-это количество клиентов за 7 — дневный период.
  • next_30_day_visits-это количество клиентов в течение 30 — дневного скользящего окна.

Я хочу написать код в виде одного SQL-запроса.

Ответ №1:

Вы можете добиться этого , используя ROW RANGE не тип фрейма, а хорошее объяснение можно найти здесь

СТРОКА — на основе физических смещений от положения текущей входной строки

ДИАПАЗОН — на основе логических смещений от позиции текущей входной строки

Кроме того ,в вашей реализации PARTITION BY предложение было бы избыточным, так как оно не создаст необходимого Frames для прогнозирования.

Подготовка Данных

 input_str = """
4219,2018-01-02 08:10:00,3.0,50.78,
4216,2018-01-02 08:01:00,5.0,100.84,
4217,2018-01-02 20:00:00,4.0,800.49,
4139,2018-01-03 11:05:00,1.0,400.0,
4170,2018-01-03 09:10:00,2.0,100.0,
4029,2018-01-06 09:06:00,6.0,300.55,
4029,2018-01-06 09:16:00,2.0,310.55,
4217,2018-01-06 09:36:00,5.0,307.55
""".split(",")

input_values = list(map(lambda x: x.strip() if x.strip() != '' else None, input_str))

cols = list(map(lambda x: x.strip() if x.strip() != 'null' else None, "customer_id  timestamp   quantity    price".split('t')))
        
n = len(input_values)
n_cols = 4

input_list = [tuple(input_values[i:i n_cols]) for i in range(0,n,n_cols)]

sparkDF = sql.createDataFrame(input_list,cols)

sparkDF = sparkDF.withColumn('customer_id',F.col('customer_id').cast(IntegerType()))
                 .withColumn('timestamp',F.col('timestamp').cast(TimestampType()))
                 .withColumn('quantity',F.col('quantity').cast(IntegerType()))
                 .withColumn('price',F.col('price').cast(DoubleType()))

sparkDF.show()

 ----------- ------------------- -------- ------ 
|customer_id|          timestamp|quantity| price|
 ----------- ------------------- -------- ------ 
|       4219|2018-01-02 08:10:00|       3| 50.78|
|       4216|2018-01-02 08:01:00|       5|100.84|
|       4217|2018-01-02 20:00:00|       4|800.49|
|       4139|2018-01-03 11:05:00|       1| 400.0|
|       4170|2018-01-03 09:10:00|       2| 100.0|
|       4029|2018-01-06 09:06:00|       6|300.55|
|       4029|2018-01-06 09:16:00|       2|310.55|
|       4217|2018-01-06 09:36:00|       5|307.55|
 ----------- ------------------- -------- ------ 

 

Агрегаты Окон

 sparkDF.createOrReplaceTempView("transactions")

sql.sql("""
        SELECT 
            TO_DATE(timestamp) as trxn_date
            ,COUNT(DISTINCT customer_id) as unique_cust_visits
            ,SUM(COUNT(DISTINCT customer_id)) OVER (
                        ORDER BY 'timestamp'
                        ROWS BETWEEN CURRENT ROW AND 7 FOLLOWING
            ) as next_7_day_visits
        FROM transactions
        GROUP BY 1
""").show()

 ---------- ------------------ ----------------- 
| trxn_date|unique_cust_visits|next_7_day_visits|
 ---------- ------------------ ----------------- 
|2018-01-02|                 3|                7|
|2018-01-03|                 2|                4|
|2018-01-06|                 2|                2|
 ---------- ------------------ ----------------- 

 

Комментарии:

1. Диапазон СТРОК приведет к агрегированию непрерывных строк, однако, поскольку я имею дело с данными транзакций, диапазон подходит лучше. Я отредактировал свой вопрос, чтобы привести лучший пример. Тем не менее, ваш ответ привел меня к правильному решению 🙂

2. Я вижу , что вы сделали , ваш ответ, рад, что он работает сейчас, поднимите голос и примите ответ, если он вам помог

Ответ №2:

Основываясь на ответе @Vaebhav, требуемый запрос в этом случае таков

 sqlContext.sql("""
        SELECT 
            TO_DATE(timestamp) as trxn_date
            ,COUNT(DISTINCT customer_id) as unique_cust_visits
            ,SUM(COUNT(DISTINCT customer_id)) OVER (
                        ORDER BY CAST(TO_DATE(timestamp) AS TIMESTAMP) DESC
                        RANGE BETWEEN INTERVAL 7 DAYS PRECEDING AND CURRENT ROW
            ) as next_7_day_visits
            ,SUM(COUNT(DISTINCT customer_id)) OVER (
                        ORDER BY CAST(TO_DATE(timestamp) AS TIMESTAMP) DESC
                        RANGE BETWEEN INTERVAL 30 DAYS PRECEDING AND CURRENT ROW
            ) as next_30_day_visits
        FROM transactions
        GROUP BY 1
        ORDER by trxn_date
""").show()
 
trxn_date unique_cust_visits next_7_day_visits next_30_day_visits
2018-01-01 1 7 9
2018-01-02 2 6 8
2018-01-03 2 4 6
2018-01-06 2 2 4
2018-01-21 2 2 3
2018-02-06 1 1 1