#apache-kafka #confluent-platform #ksqldb
Вопрос:
Я пытаюсь создать поток в ksqlDB, чтобы получить данные из темы кафки и выполнить запрос по ней.
CREATE STREAM test_location (
id VARCHAR,
name VARCHAR,
location VARCHAR
)
WITH (KAFKA_TOPIC='public.location',
VALUE_FORMAT='JSON',
PARTITIONS=10);
Данные в разделах public.location представлены в формате JSON.
ОБНОВЛЕНО сообщение по теме.
print 'public.location' from beginning limit 1;
Key format: ¯_(ツ)_/¯ - no data processed
Value format: JSON or KAFKA_STRING
rowtime: 2021/05/23 11:27:39.429 Z, key: <null>, value: {"sourceTable":{"id":"1","name":Sam,"location":Manchester,"ConnectorVersion":null,"connectorId":null,"ConnectorName":null,"DbName":null,"DbSchema":null,"TableName":null,"payload":null,"schema":null},"ConnectorVersion":null,"connectorId":null,"ConnectorName":null,"DbName":null,"DbSchema":null,"TableName":null,"payload":null,"schema":null}, partition: 3
После создания потока и выполнения ВЫБОРА в созданном потоке я получаю значение NULL на выходе. Хотя в теме есть данные.
select * from test_location
>EMIT CHANGES limit 5;
----------------------------------------------------------------- ----------------------------------------------------------------- -----------------------------------------------------------------
|ID |NAME |LOCATION |
----------------------------------------------------------------- ----------------------------------------------------------------- -----------------------------------------------------------------
|null |null |null |
|null |null |null |
|null |null |null |
|null |null |null |
|null |null |null |
Limit Reached
Query terminated
Вот подробная информация из файла docker
version: '2'
services:
ksqldb-server:
image: confluentinc/ksqldb-server:0.18.0
hostname: ksqldb-server
container_name: ksqldb-server
depends_on:
- schema-registry
ports:
- "8088:8088"
environment:
KSQL_LISTENERS: "http://0.0.0.0:8088"
KSQL_BOOTSTRAP_SERVERS: "broker:29092"
KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
# Configuration to embed Kafka Connect support.
KSQL_CONNECT_GROUP_ID: "ksql-connect-01"
KSQL_CONNECT_BOOTSTRAP_SERVERS: "broker:29092"
KSQL_CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
KSQL_CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
KSQL_CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
KSQL_CONNECT_CONFIG_STORAGE_TOPIC: "_ksql-connect-01-configs"
KSQL_CONNECT_OFFSET_STORAGE_TOPIC: "_ksql-connect-01-offsets"
KSQL_CONNECT_STATUS_STORAGE_TOPIC: "_ksql-connect-01-statuses"
KSQL_CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
KSQL_CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
KSQL_CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
KSQL_CONNECT_PLUGIN_PATH: "/usr/share/kafka/plugins"
Обновить:
Вот сообщение в теме, которое я вижу в Кафке
{
"sourceTable": {
"id": "1",
"name": Sam,
"location": Manchester,
"ConnectorVersion": null,
"connectorId": null,
"ConnectorName": null,
"DbName": null,
"DbSchema": null,
"TableName": null,
"payload": null,
"schema": null
},
"ConnectorVersion": null,
"connectorId": null,
"ConnectorName": null,
"DbName": null,
"DbSchema": null,
"TableName": null,
"payload": null,
"schema": null
}
Какой шаг или конфигурация мне не хватает?
Комментарии:
1. Пожалуйста, покажите свои фактические данные по теме
2. @OneCricketeer Здравствуйте, я обновил тему в теме ksqlDB, а также добавил, как она выглядит в теме Кафки… нуль в данных на самом деле является нулевыми полями…
3. Эти данные пришли из дебезиума? Ваша схема полезная нагрузка полностью равны нулю, поэтому нет полей идентификатора,имени,местоположения для получения. Кроме того, чтобы проанализировать эти данные, вам нужно было бы определить
payload
поле в ksql или сообщить Debezium, чтобы извлечь полезную нагрузку из события4. @OneCricketeer Эти данные поступают из процессора SCDF в Кафку. Это в основном ETL, который также содержит debezium в соединителе «начало как источник», но конкретно эта тема была создана путем обработки данных в SCDF.
5. Ладно, хорошо, мой комментарий все еще в силе. Как вы показали, в вашем сообщении
id,name,location
вообще ничего нет, поэтому вы должны ожидать, что увидите только нули в KSQL. И если вы хотите иметь возможность анализировать эти данные, вам нужно настроить KSQL так, чтобы в нем было хотя быpayload STRUCT
поле
Ответ №1:
Учитывая вашу полезную нагрузку, вам нужно будет объявить схему вложенной, потому id
что , name
, и location
не являются полями «верхнего уровня» в Json, но они вложены внутри sourceTable
.
CREATE STREAM est_location (
sourceTable STRUCT<id VARCHAR, name VARCHAR, location VARCHAR>
)
Невозможно «развернуть» данные при определении схемы, но схема должна соответствовать тому, что указано в разделе. В дополнение к sourceTable
этому вы также можете добавить ConnectorVersion
etc в схему, так как они также являются полями «верхнего уровня» в вашем JSON. Суть в том, что столбец в ksqlDB может быть объявлен только в поле верхнего уровня. Все остальное-это вложенные данные, к которым вы можете получить доступ с помощью STRUCT
типа.
Конечно, позже, когда вы сделаете запрос est_location
, вы сможете ссылаться на отдельные поля через sourceTable->id
etc.
Также можно было бы объявить производный ПОТОК, если вы хотите отменить проверку схемы:
CREATE STREAM unnested_est_location AS
SELECT sourceTable->id AS id,
sourceTable->name AS name,
sourceTable->location AS location
FROM est_location;
Конечно, это позволило бы записать данные в новую тему.