kafka connect — как фильтровать метаданные схемы из полезной нагрузки

#apache-kafka #apache-kafka-connect

#apache-kafka #apache-kafka-connect

Вопрос:

Я пытаюсь удалить схему из полезной нагрузки, и вот конфигурации

connector.properties

 name=test-source-mysql-jdbc-autoincrement
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
value.converter=org.apache.kafka.connect.json.JsonConverter
tasks.max=1
connection.url=jdbc:mysql://127.0.0.1:3306/employee_db?user=rootamp;password=root
table.whitelist=testemp
mode=incrementing
incrementing.column.name=employee_id
topic.prefix=test-mysql-jdbc-
  

и ниже приведены мои worker.properties

 bootstrap.servers=localhost:9092


key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

key.converter.schemas.enable=false
value.converter.schemas.enable=false


internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

offset.storage.file.filename=/tmp/connect.offsets

offset.flush.interval.ms=10000
plugin.path=C:UsersnameDesktopkafkalibs
  

вывод:

 {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"employee_id"},{"type":"string","optional":false,"field":"first_name"}],"optional":false,"name":"testemp"},"payload":{"employee_id":2,"first_name":"test"}}
  

исключенный вывод:

 {"payload":{"employee_id":2,"first_name":"test"}}
  

Я попытался отключить value.converter.schemas.enable= false в worker, как было предложено здесь, по-прежнему без эффекта

Я что-то упускаю?

Комментарии:

1. Почему вы хотите отключить схему?

2. «бизнес-требования» вы знаете, как они работают.

3. X-D Используйте Avro для всестороннего выигрыша.

Ответ №1:

Есть два варианта исправить это:

  • Удалите value.converter свойство из конфигурации вашего соединителя (вы используете то же value.converter самое)
  • Установите value.converter.schemas.enable=false в вашей конфигурации соединителя.

Схема добавляется в сообщение, потому что вы перезаписали конвертер значений и не отключили схему (по умолчанию для JsonConverter схемы включена). С точки зрения Kafka Connect вы использовали совершенно новый конвертер (он не будет использовать свойства из глобальной конфигурации)

Если вы отключите схему, ваше сообщение будет выглядеть следующим образом:

 {
    "employee_id": 2,
    "first_name":"test"
}