ksql — СОЗДАНИЕ ТАБЛИЦЫ приводит к таблице с нулевыми значениями, даже если раздел kafka заполнен

#apache-kafka #apache-kafka-connect #ksqldb

#apache-kafka #apache-kafka-connect #ksqldb

Вопрос:

Используя ksqlDB, я создал соединитель JDBC с пользовательским запросом. Затем из результирующей темы kafka я создал таблицу. Однако при выборе из таблицы возвращаются данные только для ПЕРВИЧНОГО КЛЮЧА, а для всех остальных значений возвращается значение null. База данных postgres, к которой я подключаюсь, имеет таблицу продаж, постоянно обновляемую новыми данными, которые я пытаюсь передавать с помощью ksql.

 ksql> CREATE SOURCE CONNECTOR con WITH (
  'connector.class'      ='io.confluent.connect.jdbc.JdbcSourceConnector',
  'connection.url'       = '....',
  'topic.prefix'         = 'sales',
  ...
  'key'                  = 'id',
  'query'                = 'SELECT id, time, price FROM sales');

Message

Created connector CON

ksql> print sales limit 1;
Key format: HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 2020/11/30 09:07:55.109 Z, key: [123], value: {"schema":{"type":"struct","fields":[{"type":"string","optional":alse,"field":"id"},{"type":"int64","optional":true,"field":"time"},{"type":"float","optional":true,"field":"price"}],"optional":false},"payload":{"id":"123","time":1,"price":10.0}}
Topic printing ceased

ksql> CREATE TABLE sales_table (id VARCHAR PRIMARY KEY, time INT, price DOUBLE) WITH (kafka_topic='sales', partitions=1, value_format='JSON');

Message

Table created

ksql> SELECT * FROM sales_table EMIT CHANGES LIMIT 1;
 ----- ----- ----- 
|ID   |TIME |PRICE|
 ----- ----- ----- 
|123  |null |null |
Limit Reached
Query terminated
 

Как вы можете видеть, в разделе kafka есть записи с правильными значениями в полях time и price. Однако, когда таблица создается по этой теме, выбор из таблицы приводит к нулевым полям времени и цены. Только идентификатор (который является столбцом ПЕРВИЧНОГО КЛЮЧА) печатается правильно.

Есть идеи, почему это происходит?

Ответ №1:

Вы используете org.apache.kafka.connect.json.JsonConverter конвертер в своем соединителе schemas.enable=true , поэтому ваша схема отсутствует (id VARCHAR PRIMARY KEY, time INT, price DOUBLE) , и, таким образом, вы получаете нулевые значения.

Лучше использовать io.confluent.connect.avro.AvroConverter (или Protobuf, или схему JSON) в вашем исходном соединителе, потому что тогда вам даже не нужно вводить схему для вашего CREATE STREAM , у вас просто есть

 CREATE TABLE sales_table  WITH (kafka_topic='sales', value_format='AVRO');
 

Вы указываете альтернативный конвертер таким образом:

 CREATE SOURCE CONNECTOR SOURCE_01 WITH (
…
    'key.converter'= 'org.apache.kafka.connect.storage.StringConverter',
    'value.converter'= 'io.confluent.connect.avro.AvroConverter',
    'value.converter.schema.registry.url'= 'http://schema-registry:8081'
    );
 

Но если вы должны использовать JSON, в вашем исходном соединителе отключите схемы:

 CREATE SOURCE CONNECTOR SOURCE_01 WITH (
…
    'value.converter.schemas.enable'= 'false'
    );
 

Ссылка: https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained