#mysql #jdbc #apache-kafka #apache-kafka-connect #confluent-platform
#mysql #jdbc #apache-kafka #apache-kafka-connect #confluent-платформа
Вопрос:
Использование частично открытой модели содержимого в JSONSchema. Это означает, что будут некоторые обязательные поля (определенные в свойствах) и необязательные поля (как определено в patternProperties), принятые темой. пожалуйста, обратитесь к производителю консоли для получения схемы JSON и примера данных. Свойства таблицы DB приведены ниже. Теперь в конфигурации соединителя я использовал «fields.whitelist»: «noOfVisits, downloadCount». Соединитель должен выбрать эти поля из сообщения темы и сохранить их в БД, если эти поля присутствуют в сообщении, верно? но для каждого случая в DB он сохраняет null. Пожалуйста, обратитесь к таблице DB, как показано ниже, для получения того же примера входных сообщений.
—> Производитель консоли:
./kafka-json-schema-console-producer
--broker-list localhost:9092
--topic DAILY_TEST_STATS_51
--property key.schema='{"type":"string"}' --property parse.key=true --property "key.separator=>"
--property value.schema='{
"type": "object",
"properties": {
"viewerId": {
"type": "integer",
"optional": false
}
},
"patternProperties": {
"^[a-zA-Z]*$": {
"type": "integer"
}
},
"additionalProperties": false
}'
"203">{"viewerId":203,"noOfVisits":10,"downloadCount":111}
"204">{"viewerId":204,"downloadCount":111}
"205">{"viewerId":205,"noOfVisits":10}
"206">{"viewerId":206,"noOfVisits":10,"downloadCount":111}
"207">{"viewerId":207,"noOfVisits":10,"downloadCount":111}
"208">{"viewerId":208,"noOfVisits":10,"downloadCount":111}
—> Свойства таблицы DB:
CREATE TABLE IF NOT EXISTS `Temp51` (
`viewerId` int(10) NOT NULL,
`noOfVisits` int(5) NULL,
`downloadCount` int(5) NULL,
PRIMARY KEY (`viewerId`)
);
—> Конфигурация соединителя:
{
"name": "temp-jdbc-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.json.JsonSchemaConverter",
"value.converter.schemas.enable": "true",
"value.converter.schema.registry.url": "http://localhost:8081",
"connection.url": "jdbc:mysql://localhost:3306/StreamTest1",
"connection.user": "root",
"connection.password": "root",
"tasks.max": "1",
"insert.mode": "upsert",
"topics": "DAILY_TEST_STATS_51",
"pk.mode": "record_value",
"pk.fields": "viewerId",
"table.name.format": "Temp51",
"fields.whitelist": "noOfVisits,downloadCount"
}
}
—> Таблица БД: Temp51
# viewerId, noOfVisits, downloadCount
203, ,
204, ,
205, ,
206, ,
207, ,
208, ,
Комментарии:
1. Как использовать реестр схем?
2. У меня конфлюэнтная настройка. итак, я использую реестр confluent-schema. эта схема должным образом присутствует в реестре схем.