#postgresql #jdbc #apache-kafka #apache-kafka-connect #confluent-platform
Вопрос:
У меня есть тема, в которой данные содержат только ключ записи, а значение записи равно нулю. Мне нужно записать поля из ключа записи в таблицу через соединитель приемника, но я получаю сообщение об ошибке. Как вставить значения из ключа записи, игнорируя значение записи = null?
{
"name": "ObjectForDelete.sink",
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"transforms": [
"uuid",
"name"
],
"topics": [
"ObjectForDeletePG"
],
"transforms.uuid.type": "org.apache.kafka.connect.transforms.InsertField$Key",
"transforms.uuid.topic.field": "key.uuid",
"transforms.name.type": "org.apache.kafka.connect.transforms.InsertField$Key",
"transforms.name.topic.field": "key.name",
"connection.url": "jdbc:postgresql://127.0.0.1:5432/ulk",
"connection.user": "root",
"connection.password": "****",
"dialect.name": "PostgreSqlDatabaseDialect",
"insert.mode": "insert",
"table.name.format": "ObjectForDelete",
"pk.mode": "none",
"pk.fields": [],
"db.timezone": "Europe/Kiev"
}
ошибка:
Caused by: org.apache.kafka.connect.errors.ConnectException: Sink connector 'ObjectForDelete.sink' is configured with 'delete.enabled=false' and 'pk.mode=kafka' and therefore requires records with a non-null Struct value and non-null Struct schema, but found record at (topic='ObjectForDeletePG',partition=0,offset=0,timestamp=1623758581060) with a null value and null value schema.
Комментарии:
1. Как говорится в ошибке, она не будет разрешать записи с нулевым значением, когда
configured with 'delete.enabled=false' and 'pk.mode=kafka'
. Значения Null обрабатываются как удаления, а не вставки/обновления. Если вы хотите записать эти поля, переместите ключ в часть записи значение2. Можно ли это сделать с помощью kafka connect? Существует преобразование valueToKey, но обратного преобразования нет.
3.
ExtractField
можно использовать, так как ваши значения в любом случае равны нулю