#apache-kafka #apache-kafka-connect #debezium
Вопрос:
Я попытался использовать значение параметра конфигурации SMT и ключ ExtractField$для следующих данных CDC JSON. Но поскольку поле идентификатора является внутренним, оно выдает мне ошибку, так как поле не распознано. Как я могу сделать его доступным для внутренних полей ?
"before": null,
"after": {
"id": 4,
"salary": 5000
},
"source": {
"version": "1.5.0.Final",
"connector": "mysql",
"name": "Try-",
"ts_ms": 1623834752000,
"snapshot": "false",
"db": "mysql_db",
"sequence": null,
"table": "EmpSalary",
"server_id": 1,
"gtid": null,
"file": "binlog.000004",
"pos": 374,
"row": 0,
"thread": null,
"query": null
},
"op": "c",
"ts_ms": 1623834752982,
"transaction": null
}
Используемая Конфигурация:
transforms=createKey,extractInt
transforms.createKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.createKey.fields=id
transforms.extractInt.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.extractInt.field=id
Ответ №1:
transforms.extractInt.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.extractInt.field=id
key.converter.schemas.enable=false
value.converter.schemas.enable=false
С помощью этих преобразований и изменений в файле свойств. Я мог бы сделать это возможным.
Ответ №2:
К сожалению, доступ к вложенным полям невозможен без использования другого преобразования.
Если вы хотите использовать встроенные, вам нужно будет извлечь состояние после, прежде чем вы сможете получить доступ к его полям
transforms=extractAfterState,createKey,extractInt
# Add these
transforms.extractAfterState.type=io.debezium.transforms.ExtractNewRecordState
# since you cannot get the ID from null events
transforms.extractAfterState.drop.tombstones=true
transforms.createKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.createKey.fields=id
transforms.extractInt.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.extractInt.field=id
Комментарии:
1. То есть вы имеете в виду, что после извлечения состояния после мне нужно выполнить преобразования с использованием KStreams или KSQL? Невозможно установить ключ для такого вложенного JSON на лету, вы это имеете в виду?
2. Это невозможно с
org.apache.kafka.connect.transforms.ValueToKey
помощью . Я не говорил, что это невозможно с помощью другого преобразования, если оно существует, или комбинации преобразований