#apache-spark #apache-spark-sql
#apache-spark #apache-spark-sql
Вопрос:
Ниже приведены данные о продажах, доступные для расчета max_price . Логика для Max_price
Max(last 3 weeks price)
Для первых 3 недель, когда данные за последние недели недоступны, максимальная цена будет
max of(week 1 , week 2 , week 3)
в приведенном ниже примере максимум (ранг 5, 6, 7).
как реализовать то же самое, используя функцию window в spark?
Комментарии:
1. Обновите свой образец ввода и ожидаемый результат.
2. Все заданные значения являются входными, а max_price — это выходные данные, которые необходимо добавить в набор данных.
Ответ №1:
Вот решение с использованием PySpark Window, lead / udf.
Пожалуйста, обратите внимание, что я изменил цены ранга 5,6,7 на 1,2,3, чтобы дифференцировать их с другими значениями для объяснения . что эта логика выбирает то, что вы объяснили.
max_price_udf = udf(lambda prices_list: max(prices_list), IntegerType())
df = spark.createDataFrame([(1, 5, 2019,1,20),(2, 4, 2019,2,18),
(3, 3, 2019,3,21),(4, 2, 2019,4,20),
(5, 1, 2019,5,1),(6, 52, 2018,6,2),
(7, 51, 2018,7,3)], ["product_id", "week", "year","rank","price"])
window = Window.orderBy(col("year").desc(),col("week").desc())
df = df.withColumn("prices_list", array([coalesce(lead(col("price"),x, None).over(window),lead(col("price"),x-3, None).over(window)) for x in range(1, 4)]))
df = df.withColumn("max_price",max_price_udf(col("prices_list")))
df.show()
какие результаты
---------- ---- ---- ---- ----- ------------ ---------
|product_id|week|year|rank|price| prices_list|max_price|
---------- ---- ---- ---- ----- ------------ ---------
| 1| 5|2019| 1| 20|[18, 21, 20]| 21|
| 2| 4|2019| 2| 18| [21, 20, 1]| 21|
| 3| 3|2019| 3| 21| [20, 1, 2]| 20|
| 4| 2|2019| 4| 20| [1, 2, 3]| 3|
| 5| 1|2019| 5| 1| [2, 3, 1]| 3|
| 6| 52|2018| 6| 2| [3, 1, 2]| 3|
| 7| 51|2018| 7| 3| [1, 2, 3]| 3|
---------- ---- ---- ---- ----- ------------ ---------
Вот решение в Scala
var df = Seq((1, 5, 2019, 1, 20), (2, 4, 2019, 2, 18),
(3, 3, 2019, 3, 21), (4, 2, 2019, 4, 20),
(5, 1, 2019, 5, 1), (6, 52, 2018, 6, 2),
(7, 51, 2018, 7, 3)).toDF("product_id", "week", "year", "rank", "price")
val window = Window.orderBy($"year".desc, $"week".desc)
df = df.withColumn("max_price", greatest((for (x <- 1 to 3) yield coalesce(lead(col("price"), x, null).over(window), lead(col("price"), x - 3, null).over(window))):_*))
df.show()
Комментарии:
1. Я обновил вопрос в соответствии с вашим набором данных, ваш ответ кажется правильным, но я не могу преобразовать эту строку в scala ->df = df.withColumn(«prices_list», array([coalesce(lead(col(«цена»), x, None).over(окно),lead(col(«цена»), x-3, Нет).over(окно)) для x в диапазоне (1, 4)]))
2. наконец-то удалось сделать в scala с помощью COLUMN(«maxPrice», array((от 1 до 3).map(e=>coalesce(lead(«цена», e).over(windowSpec),lead(«цена», e-3).over(windowSpec))): _*))
3. Однако нет необходимости использовать udf, для массива можно использовать функцию greatest, чтобы найти максимальное
4. пытался использовать что-то max, но не смог найти, не знал, что доступно наибольшее, и я собирался обновить scala, рад, что вам удалось это получить!!!
Ответ №2:
Вы можете использовать оконные функции SQL в сочетании с greatest() . Когда оконная функция SQL содержит менее 3 строк, вы учитываете текущие строки и даже предыдущие строки. Поэтому вам необходимо, чтобы во внутреннем подзапросе были вычислены значения lag1_price, lag2_price. Во внешнем запросе вы можете использовать значение row_count и использовать функцию greatest(), передав значения lag1, lag2 и current price для соответствующих значений против 2,1,0 и получить максимальное значение.
Проверьте это:
val df = Seq((1, 5, 2019,1,20),(2, 4, 2019,2,18),
(3, 3, 2019,3,21),(4, 2, 2019,4,20),
(5, 1, 2019,5,1),(6, 52, 2018,6,2),
(7, 51, 2018,7,3)).toDF("product_id", "week", "year","rank","price")
df.createOrReplaceTempView("sales")
val df2 = spark.sql("""
select product_id, week, year, price,
count(*) over(order by year desc, week desc rows between 1 following and 3 following ) as count_row,
lag(price) over(order by year desc, week desc ) as lag1_price,
sum(price) over(order by year desc, week desc rows between 2 preceding and 2 preceding ) as lag2_price,
max(price) over(order by year desc, week desc rows between 1 following and 3 following ) as max_price1 from sales
""")
df2.show(false)
df2.createOrReplaceTempView("sales_inner")
spark.sql("""
select product_id, week, year, price,
case
when count_row=2 then greatest(price,max_price1)
when count_row=1 then greatest(price,lag1_price,max_price1)
when count_row=0 then greatest(price,lag1_price,lag2_price)
else max_price1
end as max_price
from sales_inner
""").show(false)
Результаты:
---------- ---- ---- ----- --------- ---------- ---------- ----------
|product_id|week|year|price|count_row|lag1_price|lag2_price|max_price1|
---------- ---- ---- ----- --------- ---------- ---------- ----------
|1 |5 |2019|20 |3 |null |null |21 |
|2 |4 |2019|18 |3 |20 |null |21 |
|3 |3 |2019|21 |3 |18 |20 |20 |
|4 |2 |2019|20 |3 |21 |18 |3 |
|5 |1 |2019|1 |2 |20 |21 |3 |
|6 |52 |2018|2 |1 |1 |20 |3 |
|7 |51 |2018|3 |0 |2 |1 |null |
---------- ---- ---- ----- --------- ---------- ---------- ----------
---------- ---- ---- ----- ---------
|product_id|week|year|price|max_price|
---------- ---- ---- ----- ---------
|1 |5 |2019|20 |21 |
|2 |4 |2019|18 |21 |
|3 |3 |2019|21 |20 |
|4 |2 |2019|20 |3 |
|5 |1 |2019|1 |3 |
|6 |52 |2018|2 |3 |
|7 |51 |2018|3 |3 |
---------- ---- ---- ----- ---------