#apache-kafka #apache-kafka-connect #mongodb-kafka-connector
#apache-kafka #apache-kafka-connect #mongodb-kafka-connector
Вопрос:
Я могу вставлять / обновлять документы в mongo, но я изо всех сил пытаюсь удалить документы.
Вот как данные записываются в раздел kafka с помощью debezium connect из источника SQL Server (последняя строка — это то, как выглядит операция удаления):
{"user_code":1001} {"user_code":1001,"first_name":"Sally","last_name":"Thomas","email":"sally.thomas@acme.com"}
{"user_code":1002} {"user_code":1002,"first_name":"George","last_name":"Bailey","email":"gbailey@foobar.com"}
{"user_code":1003} {"user_code":1003,"first_name":"Edward","last_name":"Walker","email":"ed@walker.com"}
{"user_code":1003} null
В этом примере, даже если после значения NULL, документ с user_code 1003 все еще находится в MongoDB.
Ниже показано, как я настраиваю свой MongoSinkConnector (я уже пробовал оба mongodb.delete.on.null.values
, delete.on.null.values
но ни один из них не работал):
{
"name": "inventory-connector-sink-2",
"config": {
"connector.class" : "com.mongodb.kafka.connect.MongoSinkConnector",
"tasks.max" : "1",
"topics": "server1.dbo.customers",
"connection.uri": "mongodb://root:root@mongo:27017/",
"database": "testDB",
"collection": "customers_2",
"database.history.kafka.bootstrap.servers" : "kafka:9092",
"database.history.kafka.topic": "schema-changes.inventory",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": false,
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false,
"document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.FullKeyStrategy",
"writemodel.strategy":"com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneBusinessKeyStrategy",
"mongodb.delete.on.null.values": true
}
}
Я также пытался использовать PartialValueStrategy, но безуспешно.
PS: Я работаю с confluentinc / cp-kafka-connect docker image для моего приемного разъема.