Поток, созданный в ksqlDB, показывает НУЛЕВОЕ значение

#apache-kafka #confluent-platform #ksqldb

Вопрос:

Я пытаюсь создать поток в ksqlDB, чтобы получить данные из темы кафки и выполнить запрос по ней.

 CREATE STREAM test_location (
  id VARCHAR,
  name VARCHAR,
  location VARCHAR
  )

 WITH (KAFKA_TOPIC='public.location',
       VALUE_FORMAT='JSON',
       PARTITIONS=10);
 

Данные в разделах public.location представлены в формате JSON.

ОБНОВЛЕНО сообщение по теме.

 print 'public.location' from beginning limit 1;
Key format: ¯_(ツ)_/¯ - no data processed
Value format: JSON or KAFKA_STRING
rowtime: 2021/05/23 11:27:39.429 Z, key: <null>, value: {"sourceTable":{"id":"1","name":Sam,"location":Manchester,"ConnectorVersion":null,"connectorId":null,"ConnectorName":null,"DbName":null,"DbSchema":null,"TableName":null,"payload":null,"schema":null},"ConnectorVersion":null,"connectorId":null,"ConnectorName":null,"DbName":null,"DbSchema":null,"TableName":null,"payload":null,"schema":null}, partition: 3
 

После создания потока и выполнения ВЫБОРА в созданном потоке я получаю значение NULL на выходе. Хотя в теме есть данные.

 select * from test_location
>EMIT CHANGES limit 5;
 ----------------------------------------------------------------- ----------------------------------------------------------------- ----------------------------------------------------------------- 
|ID                                                               |NAME                                                            |LOCATION                                                          |
 ----------------------------------------------------------------- ----------------------------------------------------------------- ----------------------------------------------------------------- 
|null                                                             |null                                                             |null                                                             |
|null                                                             |null                                                             |null                                                             |
|null                                                             |null                                                             |null                                                             |
|null                                                             |null                                                             |null                                                             |
|null                                                             |null                                                             |null                                                             |
Limit Reached
Query terminated
 

Вот подробная информация из файла docker

 version: '2'

services:

  ksqldb-server:
    image: confluentinc/ksqldb-server:0.18.0
    hostname: ksqldb-server
    container_name: ksqldb-server
    depends_on:
      - schema-registry
    ports:
      - "8088:8088"
    environment:
      KSQL_LISTENERS: "http://0.0.0.0:8088"
      KSQL_BOOTSTRAP_SERVERS: "broker:29092"
      KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
      # Configuration to embed Kafka Connect support.
      KSQL_CONNECT_GROUP_ID: "ksql-connect-01"
      KSQL_CONNECT_BOOTSTRAP_SERVERS: "broker:29092"
      KSQL_CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      KSQL_CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      KSQL_CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      KSQL_CONNECT_CONFIG_STORAGE_TOPIC: "_ksql-connect-01-configs"
      KSQL_CONNECT_OFFSET_STORAGE_TOPIC: "_ksql-connect-01-offsets"
      KSQL_CONNECT_STATUS_STORAGE_TOPIC: "_ksql-connect-01-statuses"
      KSQL_CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      KSQL_CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      KSQL_CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      KSQL_CONNECT_PLUGIN_PATH: "/usr/share/kafka/plugins"
 

Обновить:
Вот сообщение в теме, которое я вижу в Кафке

 {
   "sourceTable": {
      "id": "1",
      "name": Sam,
      "location": Manchester,
      "ConnectorVersion": null,
      "connectorId": null,
      "ConnectorName": null,
      "DbName": null,
      "DbSchema": null,
      "TableName": null,
      "payload": null,
      "schema": null
   },
   "ConnectorVersion": null,
   "connectorId": null,
   "ConnectorName": null,
   "DbName": null,
   "DbSchema": null,
   "TableName": null,
   "payload": null,
   "schema": null
}
 

Какой шаг или конфигурация мне не хватает?

Комментарии:

1. Пожалуйста, покажите свои фактические данные по теме

2. @OneCricketeer Здравствуйте, я обновил тему в теме ksqlDB, а также добавил, как она выглядит в теме Кафки… нуль в данных на самом деле является нулевыми полями…

3. Эти данные пришли из дебезиума? Ваша схема полезная нагрузка полностью равны нулю, поэтому нет полей идентификатора,имени,местоположения для получения. Кроме того, чтобы проанализировать эти данные, вам нужно было бы определить payload поле в ksql или сообщить Debezium, чтобы извлечь полезную нагрузку из события

4. @OneCricketeer Эти данные поступают из процессора SCDF в Кафку. Это в основном ETL, который также содержит debezium в соединителе «начало как источник», но конкретно эта тема была создана путем обработки данных в SCDF.

5. Ладно, хорошо, мой комментарий все еще в силе. Как вы показали, в вашем сообщении id,name,location вообще ничего нет, поэтому вы должны ожидать, что увидите только нули в KSQL. И если вы хотите иметь возможность анализировать эти данные, вам нужно настроить KSQL так, чтобы в нем было хотя бы payload STRUCT поле

Ответ №1:

Учитывая вашу полезную нагрузку, вам нужно будет объявить схему вложенной, потому id что , name , и location не являются полями «верхнего уровня» в Json, но они вложены внутри sourceTable .

 CREATE STREAM est_location (
  sourceTable STRUCT<id VARCHAR, name VARCHAR, location VARCHAR>
)
 

Невозможно «развернуть» данные при определении схемы, но схема должна соответствовать тому, что указано в разделе. В дополнение к sourceTable этому вы также можете добавить ConnectorVersion etc в схему, так как они также являются полями «верхнего уровня» в вашем JSON. Суть в том, что столбец в ksqlDB может быть объявлен только в поле верхнего уровня. Все остальное-это вложенные данные, к которым вы можете получить доступ с помощью STRUCT типа.

Конечно, позже, когда вы сделаете запрос est_location , вы сможете ссылаться на отдельные поля через sourceTable->id etc.

Также можно было бы объявить производный ПОТОК, если вы хотите отменить проверку схемы:

 CREATE STREAM unnested_est_location AS
  SELECT sourceTable->id AS id,
         sourceTable->name AS name,
         sourceTable->location AS location
  FROM est_location;
 

Конечно, это позволило бы записать данные в новую тему.