#apache-kafka #apache-kafka-connect
#apache-kafka #apache-kafka-connect
Вопрос:
Я пытаюсь удалить схему из полезной нагрузки, и вот конфигурации
connector.properties
name=test-source-mysql-jdbc-autoincrement
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
value.converter=org.apache.kafka.connect.json.JsonConverter
tasks.max=1
connection.url=jdbc:mysql://127.0.0.1:3306/employee_db?user=rootamp;password=root
table.whitelist=testemp
mode=incrementing
incrementing.column.name=employee_id
topic.prefix=test-mysql-jdbc-
и ниже приведены мои worker.properties
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=C:UsersnameDesktopkafkalibs
вывод:
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"employee_id"},{"type":"string","optional":false,"field":"first_name"}],"optional":false,"name":"testemp"},"payload":{"employee_id":2,"first_name":"test"}}
исключенный вывод:
{"payload":{"employee_id":2,"first_name":"test"}}
Я попытался отключить value.converter.schemas.enable= false в worker, как было предложено здесь, по-прежнему без эффекта
Я что-то упускаю?
Комментарии:
1. Почему вы хотите отключить схему?
2. «бизнес-требования» вы знаете, как они работают.
3. X-D Используйте Avro для всестороннего выигрыша.
Ответ №1:
Есть два варианта исправить это:
- Удалите
value.converter
свойство из конфигурации вашего соединителя (вы используете то жеvalue.converter
самое) - Установите
value.converter.schemas.enable=false
в вашей конфигурации соединителя.
Схема добавляется в сообщение, потому что вы перезаписали конвертер значений и не отключили схему (по умолчанию для JsonConverter
схемы включена). С точки зрения Kafka Connect вы использовали совершенно новый конвертер (он не будет использовать свойства из глобальной конфигурации)
Если вы отключите схему, ваше сообщение будет выглядеть следующим образом:
{
"employee_id": 2,
"first_name":"test"
}