Вычисление пиковых значений в секунду после суммирования отдельных значений в clickhouse

#sql #replication #distributed #clickhouse

#sql #репликация #распределенный #clickhouse

Вопрос:

В настоящее время я использую Clickhouse cluster (2 фрагмента, 2 реплики) для чтения журналов транзакций со своего сервера. Журнал содержит такие поля, как временная метка, доставленные байты, ttms и т.д. Структура моей таблицы выглядит следующим образом:

 CREATE TABLE db.log_data_local ON CLUSTER '{cluster}' (
  timestamp DateTime,
  bytes UInt64,
  /*lots of other fields */
  ) ENGINE = ReplicatedMergeTree('/clickhouse/{cluster}/db/tables/logs/{shard}','{replica}')
PARTITION BY toYYYYMMDD(timestamp)
ORDER BY timestamp
TTL timestamp   INTERVAL 1 MONTH;

CREATE TABLE db.log_data ON CLUSTER '{cluster}'
AS cdn_data.http_access_data_local
ENGINE = Distributed('{cluster}','db','log_data_local',rand());
  

Я получаю данные из Kafka и использую materialized view для заполнения этой таблицы. Теперь мне нужно рассчитать пиковую пропускную способность в секунду из этой таблицы. Итак, в основном мне нужно суммировать поле байтов в секунду, а затем найти максимальное значение за 5-минутный период.

Я пытался использовать ReplicatedAggregatingMergeTree с агрегатными функциями для пропускной способности, но получаемое пиковое значение намного меньше по сравнению со значением, которое я получаю, когда я напрямую запрашиваю необработанную таблицу.

Проблема в том, что при создании представления материала для заполнения пиковых значений прямой запрос распределенной таблицы не дает никаких результатов, но если я запрашиваю локальную таблицу, то учитывается только частичный набор данных. Я попытался использовать промежуточную таблицу для вычисления общего количества в секунду, а затем для создания materialized, но я столкнулся с той же проблемой.

Это схема для моей таблицы пиков и материализованного представления, которое я пытаюсь создать:

 CREATE TABLE db.peak_metrics_5m_local ON CLUSTER '{cluster}'
(
  timestamp DateTime,
  peak_throughput AggregateFunction(max,UInt64),
)
ENGINE=ReplicatedAggregatingMergeTree('/clickhouse/{cluster}/db/tables/peak_metrics_5m_local/{shard}','{replica}')
PARTITION BY toYYYYMMDD(timestamp)
ORDER BY (timestamp)
TTL timestamp   toIntervalDay(90);


CREATE TABLE db.peak_metrics_5m ON CLUSTER '{cluster}'
AS cdn_data.peak_metrics_5m_local
ENGINE = Distributed('{cluster}','db','peak_metrics_5m_local',rand());

CREATE MATERIALIZED VIEW db.peak_metrics_5m_mv ON CLUSTER '{cluster}'
TO db.peak_metrics_5m_local
AS SELECT
        toStartOfFiveMinute(timestamp) as timestamp,
        maxState(bytes) as peak_throughput,
    FROM (
        SELECT 
            timestamp,
            sum(bytes) as bytes,
        FROM db.log_data_local
        GROUP BY timestamp
    )
    GROUP BY timestamp;
  

Пожалуйста, помогите мне с решением этой проблемы.

Комментарии:

1. maxState (байты) * 8 как peak_throughput выглядит довольно подозрительно из-за умножения AggregationState и number. Необходимо применить умножение либо в подзапросе, где вычисляется сумма, либо снаружи. Или рассмотрите возможность использования SimpleAggregateFunction вместо AggregateFunction.

2. Извините. Это было то, что я пробовал. Теперь отредактировали запрос MV.

Ответ №1:

Это невозможно реализовать с помощью MV. MV — это триггер вставки.

sum(bytes) as bytes, ... GROUP BY timestamp работает со вставленным буфером и не считывает данные из log_data_local таблицы.

https://github.com/ClickHouse/ClickHouse/issues/14266#issuecomment-684907869