#pyspark #apache-spark-sql
#pyspark #apache-spark-sql
Вопрос:
У меня есть таблица со следующими данными
и я пытаюсь получить следующий отдельный набор значений (следующие 3 значения), как показано ниже
Я пытался использовать функции lead, но в итоге получил следующий результат
spark.sql("select *,
coalesce(lead(page, 1) over (partition by id order by date_time asc), 'Exit') as next_pagename1,
coalesce(lead(page, 2) over (partition by id order by date_time asc), 'Exit') as next_pagename2,
coalesce(lead(page, 3) over (partition by id order by date_time asc), 'Exit') as next_pagename3,
from temp").show()
Может кто-нибудь, пожалуйста, сообщить мне, чего мне здесь не хватает?
Редактировать:
Обновлены данные примера
Комментарии:
1. Вы должны удалить столбцы с дублирующимися страницами перед запуском sql
2. Спасибо! Это именно то, с чем я борюсь. Есть мысли о том, как я могу это выполнить?
Ответ №1:
Вы можете сгруппировать данные по id
и page
и принять минимальное значение date_time
. После этого вы можете использовать sql, который вы уже предоставили:
spark.sql("""with data as (
select id, page, min(date_time) as date_time from
temp group by id, page)
select data.id, data.page,
coalesce(lead(data.page, 1) over (partition by data.id
order by data.date_time asc), 'Exit') as next_pagename1,
coalesce(lead(data.page, 2) over (partition by data.id
order by data.date_time asc), 'Exit') as next_pagename2,
coalesce(lead(data.page, 3) over (partition by data.id
order by data.date_time asc), 'Exit') as next_pagename3
from data""").show()
Вывод:
--- ----- -------------- -------------- --------------
| id| page|next_pagename1|next_pagename2|next_pagename3|
--- ----- -------------- -------------- --------------
|123|login| page1| page2| page5|
|123|page1| page2| page5| page3|
|123|page2| page5| page3| Exit|
|123|page5| page3| Exit| Exit|
|123|page3| Exit| Exit| Exit|
--- ----- -------------- -------------- --------------
С новыми данными вы можете собрать следующие страницы в массив (вызываемый data
в моем коде), а затем отфильтровать массив ( filtered_data
).
spark.sql("""
with data as (
select page,
array(lead(page, 1)over (partition by id order by date_time asc),
lead(page, 2)over (partition by id order by date_time asc),
lead(page, 3)over (partition by id order by date_time asc),
lead(page, 4)over (partition by id order by date_time asc),
lead(page, 5)over (partition by id order by date_time asc)) as next
from temp),
filtered_data as (
select page,
filter(transform(next, (x,i) -> if(i=0 or x!=next[i-1], x, null)), x -> x=x) as next
from data)
select page,
ifnull(next[0], 'Exit') as next_pagename1,
ifnull(next[1], 'Exit') as next_pagename2,
ifnull(next[2], 'Exit') as next_pagename3
from filtered_data
""").show(truncate=False)
Вывод:
----- -------------- -------------- --------------
|page |next_pagename1|next_pagename2|next_pagename3|
----- -------------- -------------- --------------
|login|page1 |page2 |page5 |
|page1|page2 |page5 |page3 |
|page2|page2 |page5 |page3 |
|page2|page5 |page3 |page2 |
|page5|page3 |page2 |Exit |
|page3|page2 |Exit |Exit |
|page2|page2 |Exit |Exit |
|page2|Exit |Exit |Exit |
----- -------------- -------------- --------------
При сборе данных в первом массиве я использую «предварительный просмотр», равный 5. При необходимости это число может быть увеличено путем добавления дополнительных элементов в массив.
Комментарии:
1. Спасибо @werner! К сожалению, ваш код не будет работать там, где позже появятся записи для той же страницы. Пожалуйста, смотрите обновленные примеры в моем посте.
2. @kkumar Я добавил другое возможное решение