#apache-kafka #confluent-platform #ksqldb
#apache-kafka #платформа confluent #ksqldb
Вопрос:
Я пытаюсь создать таблицу kafka с использованием (Confluent) ksqldb-сервера через его интерфейс REST, используя следующий код (скрипт bash):
KSQLDB_COMMAND="CREATE TABLE sample_table
(xkey VARCHAR,
xdata VARCHAR)
WITH (KAFKA_TOPIC='sample-topic',
VALUE_FORMAT='JSON',
KEY='xkey'); "
COMMAND="curl -X 'POST' '$KSQLDB_SERVER'
-H 'Content-Type: application/vnd.ksql.v1 json; charset=utf-8'
-d '{ "ksql": "$KSQLDB_COMMAND" }' "
eval $COMMAND
Возвращается следующее сообщение об ошибке:
{"@type":"statement_error","error_code":40001,"message":"Failed to prepare statement: Invalid config variable(s) in the WITH clause: KEY","statementText":"CREATE TABLE sample_table (xkey VARCHAR, xdata VARCHAR) WITH (KAFKA_TOPIC='sample-topic', VALUE_FORMAT='JSON', KEY='xkey');","entities":[]}%
Ошибка указывает на ошибку в фактическом операторе, в частности, с ключевым атрибутом.
Я могу получить базовые команды («ПОТОКИ СПИСКОВ» и т. Д.), Работающие с использованием интерфейса REST, Но не могу создавать таблицы, поэтому я полагаю, что это проблема в инструкции KSQL или в том, как я создаю команду bash (в переменной «COMMAND»).).
Любая помощь приветствуется.
Комментарии:
1. Это просто строковая переменная bash. Пожалуйста, покажите ваш фактический вызов REST
2. Мои извинения… Я добавил выполнение команды…
3. Согласно документам, вы должны поместить
PRIMARY KEY
в столбец, а не в WITH docs.ksqldb.io/en/latest/developer-guide/ksqldb-reference /…4. Проверил документы по предоставленной вами ссылке… в примере внизу размещенной страницы есть предложение «WITH», так что, насколько я вижу, это не проблема … при этом я попытался добавить ПЕРВИЧНЫЙ КЛЮЧ, как вы указали, и получил идентичную ошибку.
5. Ни один пример на этой странице не вставляет
KEY
тело WITH, поскольку ошибка, которую вы получаете, указывает на проблему, а не на ваш синтаксис bash
Ответ №1:
Я потратил немало времени на эксперименты и получил этот простой пример (моя первоначальная попытка потребовала слишком большого количества замен переменных bash, чтобы сделать ее полезной / поддерживаемой, поэтому эта версия немного упрощена). Я также обнаружил, что имена таблиц KSQLDB должны соответствовать обычным соглашениям об именовании SQL для имен таблиц (т.е. альфа, подчеркивания и т.д… но нет дефисов, что вызвало кучу ошибок в моем первоначальном вопросе… Я должен был более внимательно прочитать документацию).
Работает следующее (возможно, вам потребуется изменить адрес вашего сервера KSQLDB)… и с минимальными изменениями может быть выполнена практически любая команда KSQLDB:
####
# NOTE: table MUST be alpha (underscores are OK)... hyphens are not allowed
####
KSQLDB_SERVER="http://localhost:8088/ksql"
KSQLDB_TABLE="some_table"
KSQLDB_TOPIC="some_topic"
VALUE_FORMAT="JSON"
FMT="{ "ksql": "CREATE TABLE %s (key VARCHAR PRIMARY KEY, data VARCHAR) WITH (KAFKA_TOPIC='%s', VALUE_FORMAT='%s');" }"
JSON_DATA=$(printf "$FMT" "$KSQLDB_TABLE" "$KSQLDB_TOPIC" "$VALUE_FORMAT")
curl -X "POST" "$KSQLDB_SERVER"
-H "Content-Type: application/vnd.ksql.v1 json; charset=utf-8"
-d "$JSON_DATA"