Сообщения не поступают из базы данных mongo в тему кафки

#mongodb #apache-kafka #mongodb-kafka-connector

#mongodb #апач-кафка #mongodb-кафка-соединитель

Вопрос:

Я использую mongodb kafka source connector v1.6… kafka connect работает в распределенном режиме Проблема в том, что сообщение из базы данных mongo не опубликовано в соответствующей теме kafka

бревна:

 INFO Opened connection [connectionId{localValue:7, serverValue:283461}] to DB (org.mongodb.driver.connection:71) [2021-12-05 10:59:14,616] INFO Opened connection [connectionId{localValue:8, serverValue:283462}] to DB (org.mongodb.driver.connection:71) [2021-12-05 10:59:16,021] INFO Copying existing data on the following namespaces: [ecaf-staging.augmentPlanRelationship, ecaf-staging.augmentPlan, ecaf-staging.device, ecaf-staging.location] (com.mongodb.kafka.connect.source.MongoCopyDataManager:104) [2021-12-05 10:59:16,035] INFO Started MongoDB source task (com.mongodb.kafka.connect.source.MongoSourceTask:203) [2021-12-05 10:59:16,036] INFO WorkerSourceTask{id=mongo-ecaf-staging-0} Source task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSourceTask:225) [2021-12-05 10:59:16,386] INFO Opened connection [connectionId{localValue:9, serverValue:283463}] to DB (org.mongodb.driver.connection:71) [2021-12-05 10:59:16,394] INFO Opened connection [connectionId{localValue:10, serverValue:283464}] to DB (org.mongodb.driver.connection:71) [2021-12-05 10:59:24,042] INFO WorkerSourceTask{id=mongo-ecaf-staging-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:487) [2021-12-05 10:59:31,037] INFO Shutting down executors (com.mongodb.kafka.connect.source.MongoSourceTask:604) [2021-12-05 10:59:31,037] INFO Finished copying existing data from the collection(s). (com.mongodb.kafka.connect.source.MongoSourceTask:611) [2021-12-05 10:59:31,038] INFO Watching for database changes on 'ecaf-staging' (com.mongodb.kafka.connect.source.MongoSourceTask:677) [2021-12-05 10:59:31,066] INFO Resuming the change stream after the previous offset: {"_data": "8261AC9B83000023282B0229296E04"} (com.mongodb.kafka.connect.source.MongoSourceTask:415) [2021-12-05 10:59:34,043] INFO WorkerSourceTask{id=mongo-ecaf-staging-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:487) [2021-12-05 10:59:44,044] INFO WorkerSourceTask{id=mongo-ecaf-staging-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:487) [2021-12-05 10:59:54,045] INFO WorkerSourceTask{id=mongo-ecaf-staging-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:487) [2021-12-05 11:00:04,045] INFO WorkerSourceTask{id=mongo-ecaf-staging-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:487) [2021-12-05 11:00:14,053] INFO WorkerSourceTask{id=mongo-ecaf-staging-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:487) [2021-12-05 11:00:24,053] INFO WorkerSourceTask{id=mongo-ecaf-staging-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:487) [2021-12-05 11:00:34,054] INFO WorkerSourceTask{id=mongo-ecaf-staging-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:487) [2021-12-05 11:00:44,055] INFO WorkerSourceTask{id=mongo-ecaf-staging-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:487)  

ниже приведен мой конфигурационный файл:

 { "name":"mongo-DB", "connector.class":"com.mongodb.kafka.connect.MongoSourceConnector", "tasks.max":"1", "connection.uri":"", "database":"DB", "copy.existing":"true", "copy.existing.namespace.regex":"DB.augmentPlan$|DB.device$|DB.location$|DB.augmentPlanRelationship", "topic.namespace.map":"{"DB.augmentPlan\" : "ecaf-cdc-augment-plans","DB.augmentPlanRelationship\" : "ecaf-cdc-augment-plan-mappings","DB.device\" : "ecaf-cdc-devices","DB.location\" : "ecaf-cdc-locations"}", "poll.max.batch.size":"1000", "poll.await.time.ms":"5000", "pipeline":"[{"$match":{"ns.coll": {"$regex": "/^(DB.augmentPlan|DB.device|DB.location)$/"}}}]", "batch.size":"1", "change.stream.full.document":"updateLookup", "key.converter":"org.apache.kafka.connect.storage.StringConverter", "value.converter":"org.apache.kafka.connect.storage.StringConverter", "key.converter.schemas.enable":"false", "value.converter.schemas.enable":"false", "publish.full.document.only":"true"  

}

Пожалуйста, помогите мне это исправить