#sql #pyspark #apache-spark-sql
Вопрос:
После ответа @Vaebhav понял, что вопрос был задан неправильно. Поэтому отредактировал его с помощью своего фрагмента кода.
У меня есть следующая таблица
from pyspark.sql.types import IntegerType,TimestampType,DoubleType
input_str = """
4219,2018-01-01 08:10:00,3.0,50.78,
4216,2018-01-02 08:01:00,5.0,100.84,
4217,2018-01-02 20:00:00,4.0,800.49,
4139,2018-01-03 11:05:00,1.0,400.0,
4170,2018-01-03 09:10:00,2.0,100.0,
4029,2018-01-06 09:06:00,6.0,300.55,
4029,2018-01-06 09:16:00,2.0,310.55,
4217,2018-01-06 09:36:00,5.0,307.55,
1139,2018-01-21 11:05:00,1.0,400.0,
2170,2018-01-21 09:10:00,2.0,100.0,
4218,2018-02-06 09:36:00,5.0,307.55,
4218,2018-02-06 09:36:00,5.0,307.55
""".split(",")
input_values = list(map(lambda x: x.strip() if x.strip() != '' else None, input_str))
cols = list(map(lambda x: x.strip() if x.strip() != 'null' else None, "customer_id,timestamp,quantity,price".split(',')))
n = len(input_values)
n_cols = 4
input_list = [tuple(input_values[i:i n_cols]) for i in range(0,n,n_cols)]
sparkDF = sqlContext.createDataFrame(input_list,cols)
sparkDF = sparkDF.withColumn('customer_id',F.col('customer_id').cast(IntegerType()))
.withColumn('timestamp',F.col('timestamp').cast(TimestampType()))
.withColumn('quantity',F.col('quantity').cast(IntegerType()))
.withColumn('price',F.col('price').cast(DoubleType()))
Я хочу рассчитать аггергейт следующим образом :
trxn_date | unique_cust_visits | next_7_day_visits | next_30_day_visits |
---|---|---|---|
2018-01-01 | 1 | 7 | 9 |
2018-01-02 | 2 | 6 | 8 |
2018-01-03 | 2 | 4 | 6 |
2018-01-06 | 2 | 2 | 4 |
2018-01-21 | 2 | 2 | 3 |
2018-02-06 | 1 | 1 | 1 |
Где
- trxn_date-это дата из столбца метки времени,
- daily_cust_visits-это уникальное количество клиентов,
- next_7_day_visits-это количество клиентов за 7 — дневный период.
- next_30_day_visits-это количество клиентов в течение 30 — дневного скользящего окна.
Я хочу написать код в виде одного SQL-запроса.
Ответ №1:
Вы можете добиться этого , используя ROW
RANGE
не тип фрейма, а хорошее объяснение можно найти здесь
СТРОКА — на основе физических смещений от положения текущей входной строки
ДИАПАЗОН — на основе логических смещений от позиции текущей входной строки
Кроме того ,в вашей реализации PARTITION BY
предложение было бы избыточным, так как оно не создаст необходимого Frames
для прогнозирования.
Подготовка Данных
input_str = """
4219,2018-01-02 08:10:00,3.0,50.78,
4216,2018-01-02 08:01:00,5.0,100.84,
4217,2018-01-02 20:00:00,4.0,800.49,
4139,2018-01-03 11:05:00,1.0,400.0,
4170,2018-01-03 09:10:00,2.0,100.0,
4029,2018-01-06 09:06:00,6.0,300.55,
4029,2018-01-06 09:16:00,2.0,310.55,
4217,2018-01-06 09:36:00,5.0,307.55
""".split(",")
input_values = list(map(lambda x: x.strip() if x.strip() != '' else None, input_str))
cols = list(map(lambda x: x.strip() if x.strip() != 'null' else None, "customer_id timestamp quantity price".split('t')))
n = len(input_values)
n_cols = 4
input_list = [tuple(input_values[i:i n_cols]) for i in range(0,n,n_cols)]
sparkDF = sql.createDataFrame(input_list,cols)
sparkDF = sparkDF.withColumn('customer_id',F.col('customer_id').cast(IntegerType()))
.withColumn('timestamp',F.col('timestamp').cast(TimestampType()))
.withColumn('quantity',F.col('quantity').cast(IntegerType()))
.withColumn('price',F.col('price').cast(DoubleType()))
sparkDF.show()
----------- ------------------- -------- ------
|customer_id| timestamp|quantity| price|
----------- ------------------- -------- ------
| 4219|2018-01-02 08:10:00| 3| 50.78|
| 4216|2018-01-02 08:01:00| 5|100.84|
| 4217|2018-01-02 20:00:00| 4|800.49|
| 4139|2018-01-03 11:05:00| 1| 400.0|
| 4170|2018-01-03 09:10:00| 2| 100.0|
| 4029|2018-01-06 09:06:00| 6|300.55|
| 4029|2018-01-06 09:16:00| 2|310.55|
| 4217|2018-01-06 09:36:00| 5|307.55|
----------- ------------------- -------- ------
Агрегаты Окон
sparkDF.createOrReplaceTempView("transactions")
sql.sql("""
SELECT
TO_DATE(timestamp) as trxn_date
,COUNT(DISTINCT customer_id) as unique_cust_visits
,SUM(COUNT(DISTINCT customer_id)) OVER (
ORDER BY 'timestamp'
ROWS BETWEEN CURRENT ROW AND 7 FOLLOWING
) as next_7_day_visits
FROM transactions
GROUP BY 1
""").show()
---------- ------------------ -----------------
| trxn_date|unique_cust_visits|next_7_day_visits|
---------- ------------------ -----------------
|2018-01-02| 3| 7|
|2018-01-03| 2| 4|
|2018-01-06| 2| 2|
---------- ------------------ -----------------
Комментарии:
1. Диапазон СТРОК приведет к агрегированию непрерывных строк, однако, поскольку я имею дело с данными транзакций, диапазон подходит лучше. Я отредактировал свой вопрос, чтобы привести лучший пример. Тем не менее, ваш ответ привел меня к правильному решению 🙂
2. Я вижу , что вы сделали , ваш ответ, рад, что он работает сейчас, поднимите голос и примите ответ, если он вам помог
Ответ №2:
Основываясь на ответе @Vaebhav, требуемый запрос в этом случае таков
sqlContext.sql("""
SELECT
TO_DATE(timestamp) as trxn_date
,COUNT(DISTINCT customer_id) as unique_cust_visits
,SUM(COUNT(DISTINCT customer_id)) OVER (
ORDER BY CAST(TO_DATE(timestamp) AS TIMESTAMP) DESC
RANGE BETWEEN INTERVAL 7 DAYS PRECEDING AND CURRENT ROW
) as next_7_day_visits
,SUM(COUNT(DISTINCT customer_id)) OVER (
ORDER BY CAST(TO_DATE(timestamp) AS TIMESTAMP) DESC
RANGE BETWEEN INTERVAL 30 DAYS PRECEDING AND CURRENT ROW
) as next_30_day_visits
FROM transactions
GROUP BY 1
ORDER by trxn_date
""").show()
trxn_date | unique_cust_visits | next_7_day_visits | next_30_day_visits |
---|---|---|---|
2018-01-01 | 1 | 7 | 9 |
2018-01-02 | 2 | 6 | 8 |
2018-01-03 | 2 | 4 | 6 |
2018-01-06 | 2 | 2 | 4 |
2018-01-21 | 2 | 2 | 3 |
2018-02-06 | 1 | 1 | 1 |