Создание таблицы в kafka с использованием ksqldb-server

#apache-kafka #confluent-platform #ksqldb

#apache-kafka #платформа confluent #ksqldb

Вопрос:

Я пытаюсь создать таблицу kafka с использованием (Confluent) ksqldb-сервера через его интерфейс REST, используя следующий код (скрипт bash):

 KSQLDB_COMMAND="CREATE TABLE sample_table 
  (xkey VARCHAR, 
   xdata VARCHAR) 
  WITH (KAFKA_TOPIC='sample-topic', 
        VALUE_FORMAT='JSON', 
       KEY='xkey'); "

COMMAND="curl -X 'POST' '$KSQLDB_SERVER' 
    -H 'Content-Type: application/vnd.ksql.v1 json; charset=utf-8' 
    -d '{ "ksql": "$KSQLDB_COMMAND" }' "
eval $COMMAND
 

Возвращается следующее сообщение об ошибке:

 {"@type":"statement_error","error_code":40001,"message":"Failed to prepare statement: Invalid config variable(s) in the WITH clause: KEY","statementText":"CREATE TABLE sample_table (xkey VARCHAR, xdata VARCHAR) WITH (KAFKA_TOPIC='sample-topic', VALUE_FORMAT='JSON', KEY='xkey');","entities":[]}%
 

Ошибка указывает на ошибку в фактическом операторе, в частности, с ключевым атрибутом.

Я могу получить базовые команды («ПОТОКИ СПИСКОВ» и т. Д.), Работающие с использованием интерфейса REST, Но не могу создавать таблицы, поэтому я полагаю, что это проблема в инструкции KSQL или в том, как я создаю команду bash (в переменной «COMMAND»).).

Любая помощь приветствуется.

Комментарии:

1. Это просто строковая переменная bash. Пожалуйста, покажите ваш фактический вызов REST

2. Мои извинения… Я добавил выполнение команды…

3. Согласно документам, вы должны поместить PRIMARY KEY в столбец, а не в WITH docs.ksqldb.io/en/latest/developer-guide/ksqldb-reference /…

4. Проверил документы по предоставленной вами ссылке… в примере внизу размещенной страницы есть предложение «WITH», так что, насколько я вижу, это не проблема … при этом я попытался добавить ПЕРВИЧНЫЙ КЛЮЧ, как вы указали, и получил идентичную ошибку.

5. Ни один пример на этой странице не вставляет KEY тело WITH, поскольку ошибка, которую вы получаете, указывает на проблему, а не на ваш синтаксис bash

Ответ №1:

Я потратил немало времени на эксперименты и получил этот простой пример (моя первоначальная попытка потребовала слишком большого количества замен переменных bash, чтобы сделать ее полезной / поддерживаемой, поэтому эта версия немного упрощена). Я также обнаружил, что имена таблиц KSQLDB должны соответствовать обычным соглашениям об именовании SQL для имен таблиц (т.е. альфа, подчеркивания и т.д… но нет дефисов, что вызвало кучу ошибок в моем первоначальном вопросе… Я должен был более внимательно прочитать документацию).

Работает следующее (возможно, вам потребуется изменить адрес вашего сервера KSQLDB)… и с минимальными изменениями может быть выполнена практически любая команда KSQLDB:

 ####
# NOTE: table MUST be alpha (underscores are OK)... hyphens are not allowed
####
KSQLDB_SERVER="http://localhost:8088/ksql"
KSQLDB_TABLE="some_table"
KSQLDB_TOPIC="some_topic"
VALUE_FORMAT="JSON"

FMT="{ "ksql": "CREATE TABLE %s (key VARCHAR PRIMARY KEY, data VARCHAR) WITH (KAFKA_TOPIC='%s', VALUE_FORMAT='%s');" }"
JSON_DATA=$(printf "$FMT" "$KSQLDB_TABLE" "$KSQLDB_TOPIC" "$VALUE_FORMAT")

curl -X "POST" "$KSQLDB_SERVER" 
     -H "Content-Type: application/vnd.ksql.v1 json; charset=utf-8" 
     -d "$JSON_DATA"