#sql #replication #distributed #clickhouse
#sql #репликация #распределенный #clickhouse
Вопрос:
В настоящее время я использую Clickhouse cluster (2 фрагмента, 2 реплики) для чтения журналов транзакций со своего сервера. Журнал содержит такие поля, как временная метка, доставленные байты, ttms и т.д. Структура моей таблицы выглядит следующим образом:
CREATE TABLE db.log_data_local ON CLUSTER '{cluster}' (
timestamp DateTime,
bytes UInt64,
/*lots of other fields */
) ENGINE = ReplicatedMergeTree('/clickhouse/{cluster}/db/tables/logs/{shard}','{replica}')
PARTITION BY toYYYYMMDD(timestamp)
ORDER BY timestamp
TTL timestamp INTERVAL 1 MONTH;
CREATE TABLE db.log_data ON CLUSTER '{cluster}'
AS cdn_data.http_access_data_local
ENGINE = Distributed('{cluster}','db','log_data_local',rand());
Я получаю данные из Kafka и использую materialized view для заполнения этой таблицы. Теперь мне нужно рассчитать пиковую пропускную способность в секунду из этой таблицы. Итак, в основном мне нужно суммировать поле байтов в секунду, а затем найти максимальное значение за 5-минутный период.
Я пытался использовать ReplicatedAggregatingMergeTree с агрегатными функциями для пропускной способности, но получаемое пиковое значение намного меньше по сравнению со значением, которое я получаю, когда я напрямую запрашиваю необработанную таблицу.
Проблема в том, что при создании представления материала для заполнения пиковых значений прямой запрос распределенной таблицы не дает никаких результатов, но если я запрашиваю локальную таблицу, то учитывается только частичный набор данных. Я попытался использовать промежуточную таблицу для вычисления общего количества в секунду, а затем для создания materialized, но я столкнулся с той же проблемой.
Это схема для моей таблицы пиков и материализованного представления, которое я пытаюсь создать:
CREATE TABLE db.peak_metrics_5m_local ON CLUSTER '{cluster}'
(
timestamp DateTime,
peak_throughput AggregateFunction(max,UInt64),
)
ENGINE=ReplicatedAggregatingMergeTree('/clickhouse/{cluster}/db/tables/peak_metrics_5m_local/{shard}','{replica}')
PARTITION BY toYYYYMMDD(timestamp)
ORDER BY (timestamp)
TTL timestamp toIntervalDay(90);
CREATE TABLE db.peak_metrics_5m ON CLUSTER '{cluster}'
AS cdn_data.peak_metrics_5m_local
ENGINE = Distributed('{cluster}','db','peak_metrics_5m_local',rand());
CREATE MATERIALIZED VIEW db.peak_metrics_5m_mv ON CLUSTER '{cluster}'
TO db.peak_metrics_5m_local
AS SELECT
toStartOfFiveMinute(timestamp) as timestamp,
maxState(bytes) as peak_throughput,
FROM (
SELECT
timestamp,
sum(bytes) as bytes,
FROM db.log_data_local
GROUP BY timestamp
)
GROUP BY timestamp;
Пожалуйста, помогите мне с решением этой проблемы.
Комментарии:
1. maxState (байты) * 8 как peak_throughput выглядит довольно подозрительно из-за умножения AggregationState и number. Необходимо применить умножение либо в подзапросе, где вычисляется сумма, либо снаружи. Или рассмотрите возможность использования SimpleAggregateFunction вместо AggregateFunction.
2. Извините. Это было то, что я пробовал. Теперь отредактировали запрос MV.
Ответ №1:
Это невозможно реализовать с помощью MV. MV — это триггер вставки.
sum(bytes) as bytes, ... GROUP BY timestamp
работает со вставленным буфером и не считывает данные из log_data_local таблицы.
https://github.com/ClickHouse/ClickHouse/issues/14266#issuecomment-684907869