Как агрегировать значения по разным столбцам в PySpark (или, в конечном итоге, SQL)?

#sql #apache-spark #pyspark #apache-spark-sql

#sql #apache-spark #pyspark #apache-spark-sql

Вопрос:

Давайте рассмотрим следующие входные данные

 | incremental_id | session_start_id | session_end_id | items_bought |
|----------------|------------------|----------------|--------------|
| 1              | a                | b              | 1            |
| 2              | z                | t              | 7            |
| 3              | b                | c              | 0            |
| 4              | c                | d              | 3            |
 

Где:

  • Каждая строка представляет сеанс пользователя
  • Каждый сеанс записывает идентификатор начального / конечного сеанса
  • Мы знаем, что первые 3 строки связаны с одним и тем же пользователем, потому session_end_id = session_start_id что . Вместо этого 4-я строка связана со вторым пользователем

Я хочу иметь возможность агрегировать вышеуказанные данные, чтобы я мог получить:

  • Первый клиент купил 4 товара
  • Второй клиент купил 7 товаров

Как это можно сделать в PySpark (или, в конечном итоге, в чистом SQL)? Я бы хотел избежать использования UDFS в PySpark, но это нормально, если это единственный способ.

Спасибо за вашу помощь!

Редактировать: я обновил пример фрейма данных, incremental_id один не может использоваться для упорядочивания строк как последовательных сеансов

Комментарии:

1. AFAI может видеть, что две записи в последовательности могут быть связаны путем извлечения значения из предыдущей строки, но я не вижу, как 3 записи могут быть связаны только в запросе. Я ожидаю, что потребуется UDF.

2. @jxc Я думаю, что b.a. не так. Скорее 5,d,e,1 я предполагаю.

3. похоже на типичный вопрос с использованием graphframe.connectedComponents . или, если образец является только частью group, используйте pandas_udf с тем же методом в networkx.

Ответ №1:

Общие табличные выражения являются частью SQL: 1999.

Используя CTE, мы можем использовать приведенный ниже запрос

 WITH cte(session_start_id, session_end_id, items_bought) AS (
  select session_start_id, session_end_id, items_bought from user_session where session_start_id not in (
    select session_end_id from user_session)
UNION ALL
select a.session_start_id, b.session_end_id, b.items_bought from cte a 
  inner join user_session b on a.session_end_id = b.session_start_id)
  select session_start_id, sum(items_bought) from cte group by (session_start_id)
 

Объяснение:

  • В запросе привязки выберите все записи, у которых нет родительского элемента. (т. Е. Никакие другие записи не заканчиваются текущим идентификатором session_start_id)
  • Рекурсивно соедините идентификатор session_end_id cte с идентификатором session_start_id из таблицы.
  • Сгруппируйте записи и верните результат.

Ссылка на скрипку SQL: http://sqlfiddle.com/#!4/ac98a/4/0
(Примечание: Используется Oracle в fiddle. Но любой движок БД, поддерживающий CTE, должен работать).

Комментарии:

1. Спасибо, кажется, это работает, но, к сожалению, я использую prestodb, и конкретная версия не поддерживает рекурсивные операторы WITH в качестве предложенного вами запроса. Поэтому, к сожалению, я не могу использовать предложенный вами подход

Ответ №2:

Вот версия PySpark

 from pyspark.sql import Window
from pyspark.sql import functions as F
from pyspark.sql.types import *

# create a window over the full data so we can lag the session end id
win = Window().partitionBy().orderBy("incremental_id")

# This is logic to indicate a user change
df = df.withColumn('user_boundary', F.lag(F.col("session_end_id"), 1).over(win) != F.col("session_start_id"))
df = df.withColumn('user_boundary', F.when(F.col("user_boundary").isNull(), F.lit(False)).otherwise(F.col("user_boundary")))

# Now create an artificial user id
df = df.withColumn('user_id', F.sum(F.col("user_boundary").cast(IntegerType())).over(win))

# Aggregate
df.groupby('user_id').agg(F.sum(F.col("items_bought")).alias("total_bought")).show()

 ------- ------------ 
|user_id|total_bought|
 ------- ------------ 
|      0|           4|
|      1|           7|
 ------- ------------ 

 

Комментарии:

1. Спасибо, но, к сожалению, это решение с задержкой (1) использует предположение, которое incremental_id указывает на последующие сеансы от одного и того же пользователя. Я добавил это просто для контекста, но это должно работать независимо от этого. Я обновляю пример фрейма данных, чтобы сделать это более явным

Ответ №3:

Если вы можете получить доступ к созданию временной таблицы и затронутым метаданным количества строк, вы можете перенести это:

 insert into #CTESubs
select
    session_start_id,
    session_end_id,
    items_bought
from #user_session
WHERE
    session_start_id not in (select session_end_id from #user_session)

while(@@ROWCOUNT <> 0)
begin
    insert into #CTESubs
    select distinct
        p.session_start_id,
        c.session_end_id,
        c.items_bought
    from #user_session c
        inner join #CTESubs p on c.session_start_id = p.session_end_id
    WHERE
        p.session_start_id not in (select session_end_id from #user_session) 
        and c.session_end_id not in (select session_end_id from #CTESubs)
end

select
    session_start_id,
    sum(items_bought) items_bought
from #CTESubs
group by 
    session_start_id;