Настройка конвейера в MongoDB Kafka source connector

#mongodb #apache-kafka #apache-kafka-connect #mongodb-kafka-connector

#mongodb #апачи-кафка #apache-kafka-connect #mongodb-kafka-connector

Вопрос:

Я использую следующую конфигурацию для исходного соединителя для фильтрации и чтения только определенных записей со статусом «ОЖИДАНИЕ» из MongoDB. Необходимо опросить все записи, которые вставлены / обновлены, только со статусом ОЖИДАНИЯ. Соединитель источника может опрашивать все записи, если конвейер исключен. Может кто-нибудь помочь мне понять, как это исправить, а также есть ли способ узнать, что опрос завершен, как завершается пакетное задание, чтобы организовать другой процесс для потребителя kafka?

 name=mongo-source-demo
connector.class=com.mongodb.kafka.connect.MongoSourceConnector
tasks.max=1
# Connection and source configuration
connection.uri=mongodb://username:password@hostname:27017
database=test
collection=mongoDBtest
topic.prefix=mongodb.connector
poll.max.batch.size=1000
poll.await.time.ms=100000
publish.full.document.only=true
pipeline=[{"$match": { "Status" : "PENDING" }},{"$project":{"_id":1,"fullDocument":1}} ]
 

Комментарии:

1. pipeline=[{"$match": { "Status" : {"PENDING"} }},{"$project":{"_id":1,"fullDocument":1}} ] разве ожидание не должно быть также между {}?

2. не требуется. Конвейер работает на compass, но не в соединителе. Даже это само по себе не работает {«$match»: { «Status»:»ОЖИДАЮЩИЙ» }}

Ответ №1:

Проблема связана с «» в операторах MongoDB. Правильная конфигурация конвейера должна быть:

 pipeline=[{$match: { "Status" : "PENDING" }},{$project:{"_id":1,"fullDocument":1}} ]