#sql #apache-spark #pyspark #apache-spark-sql
#sql #apache-spark #pyspark #apache-spark-sql
Вопрос:
Давайте рассмотрим следующие входные данные
| incremental_id | session_start_id | session_end_id | items_bought |
|----------------|------------------|----------------|--------------|
| 1 | a | b | 1 |
| 2 | z | t | 7 |
| 3 | b | c | 0 |
| 4 | c | d | 3 |
Где:
- Каждая строка представляет сеанс пользователя
- Каждый сеанс записывает идентификатор начального / конечного сеанса
- Мы знаем, что первые 3 строки связаны с одним и тем же пользователем, потому
session_end_id = session_start_id
что . Вместо этого 4-я строка связана со вторым пользователем
Я хочу иметь возможность агрегировать вышеуказанные данные, чтобы я мог получить:
- Первый клиент купил 4 товара
- Второй клиент купил 7 товаров
Как это можно сделать в PySpark (или, в конечном итоге, в чистом SQL)? Я бы хотел избежать использования UDFS в PySpark, но это нормально, если это единственный способ.
Спасибо за вашу помощь!
Редактировать: я обновил пример фрейма данных, incremental_id
один не может использоваться для упорядочивания строк как последовательных сеансов
Комментарии:
1. AFAI может видеть, что две записи в последовательности могут быть связаны путем извлечения значения из предыдущей строки, но я не вижу, как 3 записи могут быть связаны только в запросе. Я ожидаю, что потребуется UDF.
2. @jxc Я думаю, что b.a. не так. Скорее
5,d,e,1
я предполагаю.3. похоже на типичный вопрос с использованием graphframe.connectedComponents . или, если образец является только частью group, используйте pandas_udf с тем же методом в networkx.
Ответ №1:
Общие табличные выражения являются частью SQL: 1999.
Используя CTE, мы можем использовать приведенный ниже запрос
WITH cte(session_start_id, session_end_id, items_bought) AS (
select session_start_id, session_end_id, items_bought from user_session where session_start_id not in (
select session_end_id from user_session)
UNION ALL
select a.session_start_id, b.session_end_id, b.items_bought from cte a
inner join user_session b on a.session_end_id = b.session_start_id)
select session_start_id, sum(items_bought) from cte group by (session_start_id)
Объяснение:
- В запросе привязки выберите все записи, у которых нет родительского элемента. (т. Е. Никакие другие записи не заканчиваются текущим идентификатором session_start_id)
- Рекурсивно соедините идентификатор session_end_id cte с идентификатором session_start_id из таблицы.
- Сгруппируйте записи и верните результат.
Ссылка на скрипку SQL: http://sqlfiddle.com/#!4/ac98a/4/0
(Примечание: Используется Oracle в fiddle. Но любой движок БД, поддерживающий CTE, должен работать).
Комментарии:
1. Спасибо, кажется, это работает, но, к сожалению, я использую prestodb, и конкретная версия не поддерживает рекурсивные операторы WITH в качестве предложенного вами запроса. Поэтому, к сожалению, я не могу использовать предложенный вами подход
Ответ №2:
Вот версия PySpark
from pyspark.sql import Window
from pyspark.sql import functions as F
from pyspark.sql.types import *
# create a window over the full data so we can lag the session end id
win = Window().partitionBy().orderBy("incremental_id")
# This is logic to indicate a user change
df = df.withColumn('user_boundary', F.lag(F.col("session_end_id"), 1).over(win) != F.col("session_start_id"))
df = df.withColumn('user_boundary', F.when(F.col("user_boundary").isNull(), F.lit(False)).otherwise(F.col("user_boundary")))
# Now create an artificial user id
df = df.withColumn('user_id', F.sum(F.col("user_boundary").cast(IntegerType())).over(win))
# Aggregate
df.groupby('user_id').agg(F.sum(F.col("items_bought")).alias("total_bought")).show()
------- ------------
|user_id|total_bought|
------- ------------
| 0| 4|
| 1| 7|
------- ------------
Комментарии:
1. Спасибо, но, к сожалению, это решение с задержкой (1) использует предположение, которое
incremental_id
указывает на последующие сеансы от одного и того же пользователя. Я добавил это просто для контекста, но это должно работать независимо от этого. Я обновляю пример фрейма данных, чтобы сделать это более явным
Ответ №3:
Если вы можете получить доступ к созданию временной таблицы и затронутым метаданным количества строк, вы можете перенести это:
insert into #CTESubs
select
session_start_id,
session_end_id,
items_bought
from #user_session
WHERE
session_start_id not in (select session_end_id from #user_session)
while(@@ROWCOUNT <> 0)
begin
insert into #CTESubs
select distinct
p.session_start_id,
c.session_end_id,
c.items_bought
from #user_session c
inner join #CTESubs p on c.session_start_id = p.session_end_id
WHERE
p.session_start_id not in (select session_end_id from #user_session)
and c.session_end_id not in (select session_end_id from #CTESubs)
end
select
session_start_id,
sum(items_bought) items_bought
from #CTESubs
group by
session_start_id;