#mongodb #apache-kafka #mongodb-kafka-connector
#mongodb #апач-кафка #mongodb-кафка-соединитель
Вопрос:
Я использую mongodb kafka source connector v1.6… kafka connect работает в распределенном режиме Проблема в том, что сообщение из базы данных mongo не опубликовано в соответствующей теме kafka
бревна:
INFO Opened connection [connectionId{localValue:7, serverValue:283461}] to DB (org.mongodb.driver.connection:71) [2021-12-05 10:59:14,616] INFO Opened connection [connectionId{localValue:8, serverValue:283462}] to DB (org.mongodb.driver.connection:71) [2021-12-05 10:59:16,021] INFO Copying existing data on the following namespaces: [ecaf-staging.augmentPlanRelationship, ecaf-staging.augmentPlan, ecaf-staging.device, ecaf-staging.location] (com.mongodb.kafka.connect.source.MongoCopyDataManager:104) [2021-12-05 10:59:16,035] INFO Started MongoDB source task (com.mongodb.kafka.connect.source.MongoSourceTask:203) [2021-12-05 10:59:16,036] INFO WorkerSourceTask{id=mongo-ecaf-staging-0} Source task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSourceTask:225) [2021-12-05 10:59:16,386] INFO Opened connection [connectionId{localValue:9, serverValue:283463}] to DB (org.mongodb.driver.connection:71) [2021-12-05 10:59:16,394] INFO Opened connection [connectionId{localValue:10, serverValue:283464}] to DB (org.mongodb.driver.connection:71) [2021-12-05 10:59:24,042] INFO WorkerSourceTask{id=mongo-ecaf-staging-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:487) [2021-12-05 10:59:31,037] INFO Shutting down executors (com.mongodb.kafka.connect.source.MongoSourceTask:604) [2021-12-05 10:59:31,037] INFO Finished copying existing data from the collection(s). (com.mongodb.kafka.connect.source.MongoSourceTask:611) [2021-12-05 10:59:31,038] INFO Watching for database changes on 'ecaf-staging' (com.mongodb.kafka.connect.source.MongoSourceTask:677) [2021-12-05 10:59:31,066] INFO Resuming the change stream after the previous offset: {"_data": "8261AC9B83000023282B0229296E04"} (com.mongodb.kafka.connect.source.MongoSourceTask:415) [2021-12-05 10:59:34,043] INFO WorkerSourceTask{id=mongo-ecaf-staging-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:487) [2021-12-05 10:59:44,044] INFO WorkerSourceTask{id=mongo-ecaf-staging-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:487) [2021-12-05 10:59:54,045] INFO WorkerSourceTask{id=mongo-ecaf-staging-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:487) [2021-12-05 11:00:04,045] INFO WorkerSourceTask{id=mongo-ecaf-staging-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:487) [2021-12-05 11:00:14,053] INFO WorkerSourceTask{id=mongo-ecaf-staging-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:487) [2021-12-05 11:00:24,053] INFO WorkerSourceTask{id=mongo-ecaf-staging-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:487) [2021-12-05 11:00:34,054] INFO WorkerSourceTask{id=mongo-ecaf-staging-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:487) [2021-12-05 11:00:44,055] INFO WorkerSourceTask{id=mongo-ecaf-staging-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:487)
ниже приведен мой конфигурационный файл:
{ "name":"mongo-DB", "connector.class":"com.mongodb.kafka.connect.MongoSourceConnector", "tasks.max":"1", "connection.uri":"", "database":"DB", "copy.existing":"true", "copy.existing.namespace.regex":"DB.augmentPlan$|DB.device$|DB.location$|DB.augmentPlanRelationship", "topic.namespace.map":"{"DB.augmentPlan\" : "ecaf-cdc-augment-plans","DB.augmentPlanRelationship\" : "ecaf-cdc-augment-plan-mappings","DB.device\" : "ecaf-cdc-devices","DB.location\" : "ecaf-cdc-locations"}", "poll.max.batch.size":"1000", "poll.await.time.ms":"5000", "pipeline":"[{"$match":{"ns.coll": {"$regex": "/^(DB.augmentPlan|DB.device|DB.location)$/"}}}]", "batch.size":"1", "change.stream.full.document":"updateLookup", "key.converter":"org.apache.kafka.connect.storage.StringConverter", "value.converter":"org.apache.kafka.connect.storage.StringConverter", "key.converter.schemas.enable":"false", "value.converter.schemas.enable":"false", "publish.full.document.only":"true"
}
Пожалуйста, помогите мне это исправить