#clickhouse
#клик-хаус
Вопрос:
У меня есть сервер Clickhouse с Engine=Kafka
таблицей с вложенными полями и kafka_handle_error_mode='stream',input_format_import_nested_json=1
настройками и двумя материализованными представлениями:
- один для
_error=''
случая, который хранит данные в нижележащей таблице с той же структурой,Engine=Kafka
что и таблица - тот, для
_error!=''
которого хранятся необработанные сообщения и ошибки в случае «неправильных» данных
Проблема в том, что когда clickhouse получает сообщение от кафки с разной длиной вложенных столбцов (например {"n":{"a":["1","2"], "b":["3"]}}
), оно проходит через Engine=Kafka
таблицу без создания _error и застревает на вставке таблицы (и зависает весь цикл сохранения), потому что таблица Кафки не проверяет длину вложенных столбцов, но целевая таблица проверяет.
Существует flatten_nested=0
настройка, которая, по-видимому, изменяет вложенное поведение, но требует другой структуры json, что неприемлемо для моего случая. Есть ли обходной путь для этого?
Ответ №1:
- Механизм Кафки не проверяет размеры вложенных массивов, потому что это ограничение таблицы MergeTree.
- Структура движка Kafka не обязательно должна совпадать с таблицей MergeTree. Просто добавьте соответствующее преобразование / проверьте ВЫБОР материализованного представления.
Пример:
create table n ( a Nested( n1 int, n2 int ) ) Engine=Kafka ....; create table m ( an1 Array(int), an2 Array(int) ) Engine = MergeTree order by tuple(); create materialized view m_mv to m as select a.n1 as an1, a.n2 as an2 from n;
Комментарии:
1. Такое ощущение, что должна быть настройка для включения проверки
2. Проверка пользовательских размеров массива в материализованном представлении выполняет эту работу. Интересно, влияет ли это на производительность вставки?
3. @YngveStardust Не влияет на производительность. Единственное различие между вложенным и массивом в этом случае-точка «.» в имени: «Массив n1(int)» против «Массива an1(int)».
4. Существование этой проверки-философский вопрос. В основном CH избегает любых проверок по соображениям производительности. Я всегда задавался вопросом, почему эта проверка вообще существует (в других местах CH позволяет сохранять мусор в таблице, почему бы и нет в данном случае).
Ответ №2:
Мне также интересно знать, есть ли чистый/эффективный способ решить эту проблему.
Предложение, которое могло бы соответствовать вашему варианту использования, состояло бы в том, чтобы сохранить строку в таблице Кафки. В материализованном представлении вы можете использовать функции Json и фильтровать ошибки …
CREATE TABLE kafka( payload String ) engine = Kafka SETTINGS kafka_broker_list = 'localhost:9092', kafka_topic_list = 'nested', kafka_group_name = 'nested', kafka_format = 'JSONAsString', kafka_num_consumers = 1, kafka_handle_error_mode = 'stream'; create materialized view consumer to valid_payload_table as select JSONExtract(payload, 'a', 'String') as a, JSONExtract(payload, 'b', 'String') as b, from kafka where _error!=''
Комментарии:
1. Я думаю, что решение JSONAsString не подходит для моего случая. Если у вас есть перечисление в целевой таблице, вам необходимо вручную проверить наличие ограничений перечисления в предложении where. Помимо необходимости проверки размеров массива.