Как я могу выбрать внутренние поля CDC JSON в качестве ключа записи в Kafka с помощью SMT?

#apache-kafka #apache-kafka-connect #debezium

Вопрос:

Я попытался использовать значение параметра конфигурации SMT и ключ ExtractField$для следующих данных CDC JSON. Но поскольку поле идентификатора является внутренним, оно выдает мне ошибку, так как поле не распознано. Как я могу сделать его доступным для внутренних полей ?

 "before": null,
"after": {
  "id": 4,
  "salary": 5000
},
"source": {
  "version": "1.5.0.Final",
  "connector": "mysql",
  "name": "Try-",
  "ts_ms": 1623834752000,
  "snapshot": "false",
  "db": "mysql_db",
  "sequence": null,
  "table": "EmpSalary",
  "server_id": 1,
  "gtid": null,
  "file": "binlog.000004",
  "pos": 374,
  "row": 0,
  "thread": null,
  "query": null
},
"op": "c",
"ts_ms": 1623834752982,
"transaction": null
}
 

Используемая Конфигурация:

 transforms=createKey,extractInt
transforms.createKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.createKey.fields=id
transforms.extractInt.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.extractInt.field=id
 

Ответ №1:

   transforms.extractInt.type=org.apache.kafka.connect.transforms.ExtractField$Key
    transforms.extractInt.field=id
   
    key.converter.schemas.enable=false
    value.converter.schemas.enable=false
 

С помощью этих преобразований и изменений в файле свойств. Я мог бы сделать это возможным.

Ответ №2:

К сожалению, доступ к вложенным полям невозможен без использования другого преобразования.

Если вы хотите использовать встроенные, вам нужно будет извлечь состояние после, прежде чем вы сможете получить доступ к его полям

 transforms=extractAfterState,createKey,extractInt

# Add these
transforms.extractAfterState.type=io.debezium.transforms.ExtractNewRecordState
# since you cannot get the ID from null events
transforms.extractAfterState.drop.tombstones=true 

transforms.createKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.createKey.fields=id
transforms.extractInt.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.extractInt.field=id
 

Комментарии:

1. То есть вы имеете в виду, что после извлечения состояния после мне нужно выполнить преобразования с использованием KStreams или KSQL? Невозможно установить ключ для такого вложенного JSON на лету, вы это имеете в виду?

2. Это невозможно с org.apache.kafka.connect.transforms.ValueToKey помощью . Я не говорил, что это невозможно с помощью другого преобразования, если оно существует, или комбинации преобразований