Как заставить механизм Кафки Clickhouse проверять длину вложенных столбцов

#clickhouse

#клик-хаус

Вопрос:

У меня есть сервер Clickhouse с Engine=Kafka таблицей с вложенными полями и kafka_handle_error_mode='stream',input_format_import_nested_json=1 настройками и двумя материализованными представлениями:

  • один для _error='' случая, который хранит данные в нижележащей таблице с той же структурой, Engine=Kafka что и таблица
  • тот, для _error!='' которого хранятся необработанные сообщения и ошибки в случае «неправильных» данных

Проблема в том, что когда clickhouse получает сообщение от кафки с разной длиной вложенных столбцов (например {"n":{"a":["1","2"], "b":["3"]}} ), оно проходит через Engine=Kafka таблицу без создания _error и застревает на вставке таблицы (и зависает весь цикл сохранения), потому что таблица Кафки не проверяет длину вложенных столбцов, но целевая таблица проверяет.

Существует flatten_nested=0 настройка, которая, по-видимому, изменяет вложенное поведение, но требует другой структуры json, что неприемлемо для моего случая. Есть ли обходной путь для этого?

Ответ №1:

  1. Механизм Кафки не проверяет размеры вложенных массивов, потому что это ограничение таблицы MergeTree.
  2. Структура движка Kafka не обязательно должна совпадать с таблицей MergeTree. Просто добавьте соответствующее преобразование / проверьте ВЫБОР материализованного представления.

Пример:

 create table n ( a Nested( n1 int, n2 int ) ) Engine=Kafka ....; create table m ( an1 Array(int), an2 Array(int) ) Engine = MergeTree order by tuple();  create materialized view m_mv to m as select   a.n1 as an1,   a.n2 as an2 from n;  

Комментарии:

1. Такое ощущение, что должна быть настройка для включения проверки

2. Проверка пользовательских размеров массива в материализованном представлении выполняет эту работу. Интересно, влияет ли это на производительность вставки?

3. @YngveStardust Не влияет на производительность. Единственное различие между вложенным и массивом в этом случае-точка «.» в имени: «Массив n1(int)» против «Массива an1(int)».

4. Существование этой проверки-философский вопрос. В основном CH избегает любых проверок по соображениям производительности. Я всегда задавался вопросом, почему эта проверка вообще существует (в других местах CH позволяет сохранять мусор в таблице, почему бы и нет в данном случае).

Ответ №2:

Мне также интересно знать, есть ли чистый/эффективный способ решить эту проблему.

Предложение, которое могло бы соответствовать вашему варианту использования, состояло бы в том, чтобы сохранить строку в таблице Кафки. В материализованном представлении вы можете использовать функции Json и фильтровать ошибки …

 CREATE TABLE kafka(  payload String  ) engine = Kafka SETTINGS kafka_broker_list = 'localhost:9092',  kafka_topic_list = 'nested',  kafka_group_name = 'nested',  kafka_format = 'JSONAsString',  kafka_num_consumers = 1,  kafka_handle_error_mode = 'stream';  create materialized view consumer  to valid_payload_table  as select  JSONExtract(payload, 'a', 'String') as a,   JSONExtract(payload, 'b', 'String') as b,  from kafka  where _error!=''   

Комментарии:

1. Я думаю, что решение JSONAsString не подходит для моего случая. Если у вас есть перечисление в целевой таблице, вам необходимо вручную проверить наличие ограничений перечисления в предложении where. Помимо необходимости проверки размеров массива.