Столкнулся с проблемой при сохранении данных в БД SQL с использованием соединителя приемника JDBC

#mysql #jdbc #apache-kafka #apache-kafka-connect #confluent-platform

#mysql #jdbc #apache-kafka #apache-kafka-connect #confluent-платформа

Вопрос:

Использование частично открытой модели содержимого в JSONSchema. Это означает, что будут некоторые обязательные поля (определенные в свойствах) и необязательные поля (как определено в patternProperties), принятые темой. пожалуйста, обратитесь к производителю консоли для получения схемы JSON и примера данных. Свойства таблицы DB приведены ниже. Теперь в конфигурации соединителя я использовал «fields.whitelist»: «noOfVisits, downloadCount». Соединитель должен выбрать эти поля из сообщения темы и сохранить их в БД, если эти поля присутствуют в сообщении, верно? но для каждого случая в DB он сохраняет null. Пожалуйста, обратитесь к таблице DB, как показано ниже, для получения того же примера входных сообщений.

—> Производитель консоли:

 ./kafka-json-schema-console-producer 
    --broker-list localhost:9092 
    --topic DAILY_TEST_STATS_51 
    --property key.schema='{"type":"string"}' --property parse.key=true --property "key.separator=>" 
    --property value.schema='{
  "type": "object",
  "properties": {
    "viewerId": {
      "type": "integer",
      "optional": false
    }
  },
  "patternProperties": {
    "^[a-zA-Z]*$": {
      "type": "integer"
    }
  },
  "additionalProperties": false
}'
"203">{"viewerId":203,"noOfVisits":10,"downloadCount":111}
"204">{"viewerId":204,"downloadCount":111}
"205">{"viewerId":205,"noOfVisits":10}
"206">{"viewerId":206,"noOfVisits":10,"downloadCount":111}
"207">{"viewerId":207,"noOfVisits":10,"downloadCount":111}
"208">{"viewerId":208,"noOfVisits":10,"downloadCount":111}
 

—> Свойства таблицы DB:

 CREATE TABLE IF NOT EXISTS `Temp51` (
  `viewerId` int(10) NOT NULL,
  `noOfVisits` int(5) NULL,
  `downloadCount` int(5) NULL,
  PRIMARY KEY (`viewerId`)
);
 

—> Конфигурация соединителя:

 {
    "name": "temp-jdbc-connector",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "io.confluent.connect.json.JsonSchemaConverter",
        "value.converter.schemas.enable": "true",
        "value.converter.schema.registry.url": "http://localhost:8081",
        "connection.url": "jdbc:mysql://localhost:3306/StreamTest1",
        "connection.user": "root",
        "connection.password": "root",
        "tasks.max": "1",
        "insert.mode": "upsert",
        "topics": "DAILY_TEST_STATS_51",
        "pk.mode": "record_value",
        "pk.fields": "viewerId",
        "table.name.format": "Temp51",
        "fields.whitelist": "noOfVisits,downloadCount"
    }
}
 

—> Таблица БД: Temp51

 # viewerId, noOfVisits, downloadCount
203, , 
204, , 
205, , 
206, , 
207, , 
208, , 
 

Комментарии:

1. Как использовать реестр схем?

2. У меня конфлюэнтная настройка. итак, я использую реестр confluent-schema. эта схема должным образом присутствует в реестре схем.