Среднее значение для потока данных SQL

#sql #postgresql #pipeline #moving-average

#sql #postgresql #конвейер #скользящее среднее

Вопрос:

Как мне изменить приведенный ниже код, чтобы добавить среднее значение для каждой группы без:

  1. что-либо в запросе, которое заставило бы выполнять несколько проходов (например, группировать по или разделять по)
  2. потенциальное переполнение вычисляет среднее значение

Группа кортежей — это все смежные кортежи по идентификатору с одинаковым значением grp, поэтому среднее значение показателя в его группе должно вычисляться таким образом, чтобы запрос выполнялся чисто конвейерным способом, требующим сканирования таблицы только один раз.

Пример вывода приведен ниже.

 -- TABLE Stream
echo -- creating table "Stream"

drop table if exists Stream;
create table Stream (
    id      int,
    grp     int,
    measure int,
    constraint streamPK
        primary key (id),
    constraint idNotNeg
        check (id >= 0),
    constraint grpNotNeg
        check (grp >= 0)
);

-- ---------------------------------------------------------------------------
-- POPULATE: add some tuples to table Stream
echo -- populating "Stream"

insert into Stream (id, grp, measure)
values
    ( 0, 0,  2),
    ( 1, 0,  3),
    ( 2, 1,  5),
    ( 3, 1,  7),
    ( 4, 1, 11),
    ( 5, 0, 13),
    ( 6, 0, 17),
    ( 7, 0, 19),
    ( 8, 0, 23),
    ( 9, 2, 29),
    (10, 2, 31),
    (11, 5, 37),
    (12, 3, 41),
    (13, 3, 43);

echo -- creating composite type "intRec"

drop type if exists
    intRec
    cascade;
create type intRec as (
    number  int,
    restart boolean
);

-- ---------------------------------------------------------------------------
-- runningSum_state : accumulator function
echo -- creating function "runningSum_state"

drop function if exists
    runningSum_state(int, intRec)
    cascade;
create function runningSum_state(int, intRec)
returns int
language plpgsql
as $f$
    declare i alias for $1;
    declare a alias for $2;
    declare j int;
    begin
        if a.restart or i is null then
            j := a.number;
        elsif a.number is null then
            j := i;
        else
            j := a.number   i;
        end if;
        return j;
    end
$f$;

-- ---------------------------------------------------------------------------
-- runningSum_final : returns the aggregate value
echo -- creating function "runningSum_final"

drop function if exists
    runningSum_final(int)
    cascade;
create function runningSum_final(int)
returns intRec
language sql
as $f$
    select cast(($1, false) as intRec);
$f$;

-- ---------------------------------------------------------------------------
-- runningSum : the aggregate function
echo -- creating aggregate function "runningSum"

drop aggregate if exists
    runningSum(intRec)
    cascade;
create aggregate runningSum(intRec) (
    sfunc     = runningSum_state,
    stype     = int,
    finalfunc = runningSum_final
);

-- ---------------------------------------------------------------------------
-- pipeline sliging-window query that uses our agggregate function
echo -- querying "Stream" with running sum

with
    -- look at the neighbour tuple to the left to fetch its grp value
    CellLeft (id, grp, measure, lft) as (
        select  id,
                grp,
                measure,
                coalesce(
                    max(grp) over (
                        order by id
                        rows between
                        1 preceding
                            and
                        1 preceding ),
                    -1 )
        from Stream
    ),
    -- determine whether current tuple is start of a group
    CellStart(id, grp, measure, start) as (
        select  id,
                grp,
                measure,
                cast(
                    case
                    when grp = lft then 0
                    else                1
                    end
                as boolean)
        from CellLeft
    ),
    -- bundle the measure and start-flag into an intRC
    CellFlag(id, grp, intRC) as (
        select  id,
                grp,
                cast((measure, start) as intRec)
        from CellStart
    ),
    -- call our runningSum aggregator
    CellRun(id, grp, measure, runningRC) as (
        select  id,
                grp,
                (intRC).number,
                runningSum(intRC)
                    over (order by id)
        from CellFlag
    ),
    -- extract the running sum from the composite
    CellAggr(id, grp, measure, running) as (
        select  id,
                grp,
                measure,
                (runningRC).number
        from CellRun
    )
-- report
select id, grp, measure, running
from CellAggr
order by id;
 

Пример вывода

  id | grp | measure |     average      
---- ----- --------- ------------------
  0 |   0 |       2 |                2
  1 |   0 |       3 |              2.5
  2 |   1 |       5 |                5
  3 |   1 |       7 |                6
  4 |   1 |      11 | 7.66666666666667
  5 |   0 |      13 |               13
  6 |   0 |      17 |               15
  7 |   0 |      19 | 16.3333333333333
  8 |   0 |      23 |               18
  9 |   2 |      29 |               29
 10 |   2 |      31 |               30
 11 |   5 |      37 |               37
 12 |   3 |      41 |               41
 13 |   3 |      43 |               42
 

(14 строк)

Комментарии:

1. Не вандализируйте свои собственные сообщения. Гордон — очень щедрый участник. Если у вас возникли проблемы с поддержкой, которую вы получили, пожалуйста, запросите его для продолжения поддержки.

2. Пожалуйста, не создавайте больше работы для других людей, вандализируя ваши сообщения. Размещая в сети Stack Exchange, вы предоставляете Stack Exchange не подлежащее отзыву право в соответствии с лицензией CC BY-SA 4.0 на распространение этого контента (т. Е. Независимо от ваших будущих решений). Согласно политике обмена стеками, распространяется версия post, не подвергшаяся вандализму. Таким образом, любой вандализм будет отменен. Если вы хотите узнать больше об удалении записи, пожалуйста, смотрите: Как работает удаление?

Ответ №1:

В SQL вы бы выразили это как:

 select t.*,
       avg(measure) over (partition by grp order by id) as group_running_avg
from t;
 

Вы должны доверять плану выполнения, но вы можете помочь ему вместе с индексом (grp, id, measure) .

Комментарии:

1. Не могли бы вы подробнее остановиться на этом? Спасибо

2. @torzihp . . . SQL не является процедурным языком. Вы должны зависеть от оптимизатора, чтобы выбрать наилучший путь выполнения.