#java #mongodb #changestream
#java #mongodb #поток изменений
Вопрос:
У меня возникла проблема с mongodb (версия 4.2), в частности, с функциональностью потока изменений.
У меня есть кластер набора реплик, состоящий из 1 первичного, 1 вторичного и 1 арбитра, и в моем java-коде с API mongodb-driver-sync 4.2.0-beta1 у меня есть процесс .watch () для интересующей коллекции. Как показано ниже:
MongoClient mongoClient = MongoClients.create("mongodb://localhost:27017,localhost:27018,localhost:27019/?replicaSet=replica");
MongoDatabase database = mongoClient.getDatabase("test");
MongoCollection<Document> collectionStream = database.getCollection("myCollection");
List<Bson> pipeline = Arrays.asList(Aggregates.match(Filters.and(Filters.in("operationType", Arrays.asList("insert", "update", "replace", "invalidate")))));
MongoCursor<ChangeStreamDocument<Document>> cursor = collectionStream.watch(pipeline).fullDocument(FullDocument.UPDATE_LOOKUP).iterator();
ChangeStreamDocument<Document> streamedEvent = cursor.next();
System.out.println("Streamed event: " streamedEvent);
В принципе, поток работает нормально. Когда происходит операция вставки / обновления, событие распознается, и документ передается в потоковом режиме правильно.
Однако, когда один из двух узлов (первичный или вторичный, несущий данные) выходит из строя, наблюдатель прекращает передачу чего-либо.
Операции обновления / вставки в базе данных продолжаются нормально, но поток заблокирован. Как только я перезапускаю один из двух узлов, поток немедленно возобновляется корректно, а также показывает мне события, которые ранее не транслировались.
Если, с другой стороны, узел-арбитр выходит из строя, поток продолжает работать нормально.
Ниже мой файл rs.conf() и, как вы можете видеть, параметр WriteConcern равен 1.
{
"_id" : "replica",
"version" : 31,
"protocolVersion" : NumberLong(1),
"writeConcernMajorityJournalDefault" : true,
"members" : [
{
"_id" : 0,
"host" : "host1:27017",
"arbiterOnly" : false,
"buildIndexes" : true,
"hidden" : false,
"priority" : 1000,
"tags" : {
},
"slaveDelay" : NumberLong(0),
"votes" : 1
},
{
"_id" : 1,
"host" : "host2:27017",
"arbiterOnly" : false,
"buildIndexes" : true,
"hidden" : false,
"priority" : 1,
"tags" : {
},
"slaveDelay" : NumberLong(0),
"votes" : 1
},
{
"_id" : 4,
"host" : "host2:27018",
"arbiterOnly" : true,
"buildIndexes" : true,
"hidden" : false,
"priority" : 0,
"tags" : {
},
"slaveDelay" : NumberLong(0),
"votes" : 1
}
],
"settings" : {
"chainingAllowed" : true,
"heartbeatIntervalMillis" : 500,
"heartbeatTimeoutSecs" : 3,
"electionTimeoutMillis" : 3000,
"catchUpTimeoutMillis" : -1,
"catchUpTakeoverDelayMillis" : 30000,
"getLastErrorModes" : {
},
"getLastErrorDefaults" : {
"w" : 1,
"j" : false,
"wtimeout" : 0
},
"replicaSetId" : ObjectId("5f16a4e1e1c622bbea578576")
}}
Кто-нибудь может помочь мне решить эту проблему?
Обновления: Чтобы решить это поведение, в каждом файле .conf каждого узла я устанавливаю replication.enableMajorityReadConcern равным false, чтобы отключить ReadConcernMajority . В любом случае, следуя этой настройке, останавливая один из основных или дополнительных узлов, я всегда получаю следующее исключение в консоли:
Exception in thread "main" com.mongodb.MongoExecutionTimeoutException: Error waiting for snapshot not less than { ts: Timestamp(1605805914, 1), t: -1 }, current relevant optime is { ts: Timestamp(1605805864, 1), t: 71 }. :: caused by :: operation exceeded time limit
at com.mongodb.internal.connection.ProtocolHelper.createSpecialException(ProtocolHelper.java:239)
at com.mongodb.internal.connection.ProtocolHelper.getCommandFailureException(ProtocolHelper.java:171)
at com.mongodb.internal.connection.InternalStreamConnection.receiveCommandMessageResponse(InternalStreamConnection.java:359)
at com.mongodb.internal.connection.InternalStreamConnection.sendAndReceive(InternalStreamConnection.java:280)
at com.mongodb.internal.connection.UsageTrackingInternalConnection.sendAndReceive(UsageTrackingInternalConnection.java:100)
at com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection.sendAndReceive(DefaultConnectionPool.java:490)
at com.mongodb.internal.connection.CommandProtocolImpl.execute(CommandProtocolImpl.java:71)
at com.mongodb.internal.connection.DefaultServer$DefaultServerProtocolExecutor.execute(DefaultServer.java:259)
at com.mongodb.internal.connection.DefaultServerConnection.executeProtocol(DefaultServerConnection.java:202)
at com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:118)
at com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:110)
at com.mongodb.internal.operation.CommandOperationHelper.executeCommand(CommandOperationHelper.java:345)
at com.mongodb.internal.operation.CommandOperationHelper.executeCommand(CommandOperationHelper.java:336)
at com.mongodb.internal.operation.CommandOperationHelper.executeCommandWithConnection(CommandOperationHelper.java:222)
at com.mongodb.internal.operation.CommandOperationHelper$5.call(CommandOperationHelper.java:208)
at com.mongodb.internal.operation.OperationHelper.withReadConnectionSource(OperationHelper.java:583)
at com.mongodb.internal.operation.CommandOperationHelper.executeCommand(CommandOperationHelper.java:205)
at com.mongodb.internal.operation.AggregateOperationImpl.execute(AggregateOperationImpl.java:189)
at com.mongodb.internal.operation.ChangeStreamOperation$1.call(ChangeStreamOperation.java:325)
at com.mongodb.internal.operation.ChangeStreamOperation$1.call(ChangeStreamOperation.java:321)
at com.mongodb.internal.operation.OperationHelper.withReadConnectionSource(OperationHelper.java:583)
at com.mongodb.internal.operation.ChangeStreamOperation.execute(ChangeStreamOperation.java:321)
at com.mongodb.internal.operation.ChangeStreamOperation.execute(ChangeStreamOperation.java:60)
at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:178)
at com.mongodb.client.internal.ChangeStreamIterableImpl.execute(ChangeStreamIterableImpl.java:204)
at com.mongodb.client.internal.ChangeStreamIterableImpl.cursor(ChangeStreamIterableImpl.java:158)
at com.mongodb.client.internal.ChangeStreamIterableImpl.iterator(ChangeStreamIterableImpl.java:153)
at com.softstrategy.ProvaWatcher.ProvaWatcherApplication.main(ProvaWatcherApplication.java:34)
С другой стороны, если я закомментирую enableMajorityReadConcern в каждом файле .conf узлов, как это происходит по умолчанию, это исключение не появляется.
Следовательно, мои вопросы следующие два:
- Почему это исключение возникает только тогда, когда для ReadConcern установлено значение false и когда узел down является узлом, несущим данные?
- Почему, когда узел-арбитр выходит из строя, это исключение не возникает независимо от параметра ReadConcern?
Спасибо!
Комментарии:
1. можете ли вы включить вывод
rs.conf().members
и заблокироватьlog
файл для любой записи, содержащейs:"F"
Ответ №1:
В архитектуре PSA, если какой-либо из узлов, несущих данные, недоступен, у вас больше нет большинства узлов, несущих данные. Это означает, что вы сможете вставлять с помощью w: 1, но не w: большинство, и вы не сможете выполнять большинство операций чтения.
Изменение потоков за https://docs.mongodb.com/manual/reference/read-concern-majority/#disable-read-concern-majority используйте озабоченность по поводу чтения большинством голосов:
Отключение проблемы чтения «большинства» отключает поддержку потоков изменений для MongoDB 4.0 и более ранних версий. Для MongoDB 4.2 отключение «большинства» проблем чтения не влияет на доступность потоков изменений.
Это также подразумевается https://www.mongodb.com/blog/post/an-introduction-to-change-streams учитывая
Общий порядок
MongoDB 3.6 имеет глобальные логические часы, которые позволяют серверу упорядочивать все изменения в разделенном кластере. Приложения всегда будут получать изменения в том порядке, в котором они были применены к базе данных.
Полный порядок возможен только при чтении большинством голосов.
Используйте архитектуру PSS, если вы хотите, чтобы потоки изменений продолжали генерировать события, когда один из узлов, несущих данные, недоступен.
Вы также можете попробовать отключить большинство проблем с чтением в 4.2 , но у этого есть другие проблемы, как описано в первой ссылке.
Комментарии:
1. Я понимаю, что вы сосредоточены на параметре «WriteConcern», но в моем наборе реплик, прочитав файл rs.conf() (который я только что включил в ответ), вы можете увидеть, что этот параметр равен 1. Поток все равно не работает…. как вы можете это объяснить? Тем не менее, если я отключу ReadConcernMajority, когда один из узлов, несущих данные, выходит из строя, я получаю исключение MongoExecutionTimeoutException (мой вопрос о пожалуйста developer.mongodb.com/community/forums/t /… ). Я не понимаю, где я ошибаюсь….
2. Проблема чтения, а не проблема записи.
3. Обновите этот вопрос всей соответствующей информацией вместо создания повторяющихся вопросов.