Может ли Spark SQL ссылаться на первую строку предыдущего окна / группы?

#sql #apache-spark #apache-spark-sql #window-functions

#sql #apache-spark #apache-spark-sql #окно-функции

Вопрос:

У меня есть своего рода поток событий, который выглядит следующим образом:

 Time UserId SessionId EventType EventData
1    2      A         Load      /a ...
2    1      B         Impressn  X ...
3    2      A         Impressn  Y ...
4    1      B         Load      /b ...
5    2      A         Load      /info ...
6    1      B         Load      /about ...
7    2      A         Impressn  Z ...
 

На практике у пользователей может быть много сеансов в течение больших временных интервалов, и есть также тип события щелчка, но здесь все просто, я пытаюсь увидеть загрузки (просмотры страниц), которые приводят к следующей загрузке, а также какие показы произошли в совокупности.

Итак, без SQL я загрузил это, сгруппировал по пользователям, упорядочил по времени и для каждого сеанса пометил каждую строку информацией о предыдущей загрузке (если таковая имеется). С помощью

 val outDS = logDataset.groupByKey(_.UserId)
      .flatMapGroups((_, iter) => gather(iter))
 

где gather сортирует iter по времени (может быть избыточным, поскольку входные данные сортируются по времени), затем выполняет итерацию по последовательности, устанавливает lastLoadData в значение null при каждом новом сеансе, добавляет lastLoadData к каждой строке и обновляет lastLoadData к данным этой строки, если строка является типом загрузки. Создание чего-то вроде:

 Time UserId SessionId EventType EventData  LastLoadData
1    2      A         Load      / ...      null
2    1      B         Impressn  X ...      null
3    2      A         Impressn  Y ...      / ...
4    1      B         Load      / ...      null
5    2      A         Load      /info ...  / ...
6    1      B         Load      /about ... / ...
7    2      A         Impressn  Z ...      /info ...
 

Что позволяет мне затем агрегировать, какие (просмотры страниц) нагрузки приводят к каким другим нагрузкам, или на каждой (странице) загрузке, каковы 5 лучших событий Impressn.

     outDS.createOrReplaceTempView(tempTable)
    val journeyPageViews = sparkSession.sql(
      s"""SELECT lastLoadData, EventData,
         | count(distinct UserId) as users,
         | count(distinct SessionId) as sessions
         |FROM ${tempTable}
         |WHERE EventType='Load'
         |GROUP BY lastLoadData, EventData""".stripMargin)
 

Но у меня такое ощущение, что добавление столбца lastLoadData также может быть выполнено с использованием Spark SQL windows, однако я зависаю на двух частях этого:

  1. Если я создам окно поверх идентификатора пользователя идентификатора сеанса, упорядоченного по времени, как оно будет применяться ко всем событиям, но посмотрите на предыдущее событие загрузки? (НАПРИМЕР, Impressn получит новый столбец lastLoadData, присвоенный предыдущим EventData этого окна)
  2. Если я каким-то образом создам новое окно для каждого события загрузки сеанса (также не уверен, как), событие загрузки в начале окна (предположительно «first») должно получить lastLoadData «first» предыдущего окна, так что, вероятно, это тоже неправильный способ сделать это.

Ответ №1:

Вы можете замаскировать данные, которые не Load null используются case when , и получить LastLoadData using last с ignorenull помощью set to true :

 logDataset.createOrReplaceTempView("table")
val logDataset2 = spark.sql("""
select 
    *,
    last(case when EventType = 'Load' then EventData end, true) 
        over (partition by UserId, SessionId
              order by Time
              rows between unbounded preceding and 1 preceding) LastLoadData 
from table
order by time
""")

logDataset2.show
 ---- ------ --------- --------- ---------- ------------ 
|Time|UserId|SessionId|EventType| EventData|LastLoadData|
 ---- ------ --------- --------- ---------- ------------ 
|   1|     2|        A|     Load|    /a ...|        null|
|   2|     1|        B| Impressn|     X ...|        null|
|   3|     2|        A| Impressn|     Y ...|      /a ...|
|   4|     1|        B|     Load|    /b ...|        null|
|   5|     2|        A|     Load| /info ...|      /a ...|
|   6|     1|        B|     Load|/about ...|      /b ...|
|   7|     2|        A| Impressn|     Z ...|   /info ...|
 ---- ------ --------- --------- ---------- ------------