Весенние данные MongoDB останавливают потоковое наблюдение без ошибок

#java #spring #mongodb #spring-data #spring-webflux

Вопрос:

У меня есть поток, наблюдающий за моей коллекцией, который я хотел бы остановить через несколько секунд. Я могу остановить это, однако это всегда приводит меня к ошибке.

Это мой поток :

 
ChangeStreamOptions options = ChangeStreamOptions.builder()
            .filter(newAggregation(Job.class, match(where("operationType").in("insert"))))
            .build();

    Flux<ChangeStreamEvent<Job>> jobs = reactiveMongoTemplate.changeStream("jobs", options, Job.class);

    jobs
       .doOnNext(x -> {
      log.info("Notif body received={}", x.getBody());
    })
       .doOnCancel(() -> {
            log.info("cancelling consuming...");})
       .take(Duration.ofSeconds(10))
    .subscribe()

;
 

Лигн .take(Duration.ofSeconds(10)) останавливает поток, как и ожидалось.

Через десять секунд после последнего полученного уведомления поток останавливается, но возникает ошибка :

 2021-06-30 19:21:03.448  INFO 51694 --- [     parallel-1] c.p.c.JobChangeStreamService             : cancelling consuming...
2021-06-30 19:21:04.339 ERROR 51694 --- [       Thread-8] org.mongodb.driver.operation             : Callback onResult call produced an error

com.mongodb.MongoException: state should be: open
    at com.mongodb.MongoException.fromThrowableNonNull(MongoException.java:79) ~[mongodb-driver-core-3.11.2.jar:na]
    at com.mongodb.async.client.AbstractSubscription.onError(AbstractSubscription.java:135) ~[mongodb-driver-async-3.11.2.jar:na]
    at com.mongodb.async.client.MongoIterableSubscription$2.onResult(MongoIterableSubscription.java:97) ~[mongodb-driver-async-3.11.2.jar:na]
    at com.mongodb.async.client.MongoIterableSubscription$2.onResult(MongoIterableSubscription.java:89) ~[mongodb-driver-async-3.11.2.jar:na]
    at com.mongodb.operation.AsyncChangeStreamBatchCursor$3.onResult(AsyncChangeStreamBatchCursor.java:133) ~[mongodb-driver-core-3.11.2.jar:na]
    at com.mongodb.operation.AsyncChangeStreamBatchCursor$3.onResult(AsyncChangeStreamBatchCursor.java:129) ~[mongodb-driver-core-3.11.2.jar:na]
    at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49) ~[mongodb-driver-core-3.11.2.jar:na]
    at com.mongodb.operation.AsyncChangeStreamBatchCursor$4.onResult(AsyncChangeStreamBatchCursor.java:168) ~[mongodb-driver-core-3.11.2.jar:na]
    at com.mongodb.operation.AsyncChangeStreamBatchCursor$4.onResult(AsyncChangeStreamBatchCursor.java:159) ~[mongodb-driver-core-3.11.2.jar:na]
    at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49) ~[mongodb-driver-core-3.11.2.jar:na]
    at com.mongodb.operation.AsyncQueryBatchCursor$CommandResultSingleResultCallback.onResult(AsyncQueryBatchCursor.java:331) ~[mongodb-driver-core-3.11.2.jar:na]
    at com.mongodb.operation.AsyncQueryBatchCursor$CommandResultSingleResultCallback.onResult(AsyncQueryBatchCursor.java:310) ~[mongodb-driver-core-3.11.2.jar:na]
    at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49) ~[mongodb-driver-core-3.11.2.jar:na]
    at com.mongodb.internal.connection.DefaultServer$DefaultServerProtocolExecutor$2.onResult(DefaultServer.java:242) ~[mongodb-driver-core-3.11.2.jar:na]
    at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49) ~[mongodb-driver-core-3.11.2.jar:na]
    at com.mongodb.internal.connection.CommandProtocolImpl$1.onResult(CommandProtocolImpl.java:83) ~[mongodb-driver-core-3.11.2.jar:na]
    at com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection$1.onResult(DefaultConnectionPool.java:467) ~[mongodb-driver-core-3.11.2.jar:na]
    at com.mongodb.internal.connection.UsageTrackingInternalConnection$2.onResult(UsageTrackingInternalConnection.java:111) ~[mongodb-driver-core-3.11.2.jar:na]
    at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49) ~[mongodb-driver-core-3.11.2.jar:na]
    at com.mongodb.internal.connection.InternalStreamConnection$2$1.onResult(InternalStreamConnection.java:401) ~[mongodb-driver-core-3.11.2.jar:na]
    at com.mongodb.internal.connection.InternalStreamConnection$2$1.onResult(InternalStreamConnection.java:376) ~[mongodb-driver-core-3.11.2.jar:na]
    at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback$MessageCallback.onResult(InternalStreamConnection.java:677) ~[mongodb-driver-core-3.11.2.jar:na]
    at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback$MessageCallback.onResult(InternalStreamConnection.java:644) ~[mongodb-driver-core-3.11.2.jar:na]
    at com.mongodb.internal.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:514) ~[mongodb-driver-core-3.11.2.jar:na]
    at com.mongodb.internal.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:511) ~[mongodb-driver-core-3.11.2.jar:na]
    at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:220) ~[mongodb-driver-core-3.11.2.jar:na]
    at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:203) ~[mongodb-driver-core-3.11.2.jar:na]
    at java.base/sun.nio.ch.Invoker.invokeUnchecked(Invoker.java:127) ~[na:na]
    at java.base/sun.nio.ch.Invoker.invokeDirect(Invoker.java:158) ~[na:na]
    at java.base/sun.nio.ch.UnixAsynchronousSocketChannelImpl.implRead(UnixAsynchronousSocketChannelImpl.java:568) ~[na:na]
    at java.base/sun.nio.ch.AsynchronousSocketChannelImpl.read(AsynchronousSocketChannelImpl.java:276) ~[na:na]
    at java.base/sun.nio.ch.AsynchronousSocketChannelImpl.read(AsynchronousSocketChannelImpl.java:297) ~[na:na]
    at com.mongodb.internal.connection.AsynchronousSocketChannelStream$AsynchronousSocketChannelAdapter.read(AsynchronousSocketChannelStream.java:137) ~[mongodb-driver-core-3.11.2.jar:na]
    at com.mongodb.internal.connection.AsynchronousChannelStream.readAsync(AsynchronousChannelStream.java:105) ~[mongodb-driver-core-3.11.2.jar:na]
    at com.mongodb.internal.connection.InternalStreamConnection.readAsync(InternalStreamConnection.java:511) ~[mongodb-driver-core-3.11.2.jar:na]
    at com.mongodb.internal.connection.InternalStreamConnection.access$1000(InternalStreamConnection.java:76) ~[mongodb-driver-core-3.11.2.jar:na]
    at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback.onResult(InternalStreamConnection.java:634) ~[mongodb-driver-core-3.11.2.jar:na]
    at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback.onResult(InternalStreamConnection.java:619) ~[mongodb-driver-core-3.11.2.jar:na]
    at com.mongodb.internal.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:514) ~[mongodb-driver-core-3.11.2.jar:na]
    at com.mongodb.internal.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:511) ~[mongodb-driver-core-3.11.2.jar:na]
    at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:220) ~[mongodb-driver-core-3.11.2.jar:na]
    at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:203) ~[mongodb-driver-core-3.11.2.jar:na]
    at java.base/sun.nio.ch.Invoker.invokeUnchecked(Invoker.java:127) ~[na:na]
    at java.base/sun.nio.ch.UnixAsynchronousSocketChannelImpl.finishRead(UnixAsynchronousSocketChannelImpl.java:443) ~[na:na]
    at java.base/sun.nio.ch.UnixAsynchronousSocketChannelImpl.finish(UnixAsynchronousSocketChannelImpl.java:193) ~[na:na]
    at java.base/sun.nio.ch.UnixAsynchronousSocketChannelImpl.onEvent(UnixAsynchronousSocketChannelImpl.java:215) ~[na:na]
    at java.base/sun.nio.ch.KQueuePort$EventHandlerTask.run(KQueuePort.java:312) ~[na:na]
    at java.base/sun.nio.ch.AsynchronousChannelGroupImpl$1.run(AsynchronousChannelGroupImpl.java:112) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
Caused by: java.lang.IllegalStateException: state should be: open
    at com.mongodb.assertions.Assertions.isTrue(Assertions.java:70) ~[mongodb-driver-core-3.11.2.jar:na]
    at com.mongodb.internal.session.BaseClientSessionImpl.advanceOperationTime(BaseClientSessionImpl.java:107) ~[mongodb-driver-core-3.11.2.jar:na]
    at com.mongodb.internal.session.ClientSessionContext.advanceOperationTime(ClientSessionContext.java:70) ~[mongodb-driver-core-3.11.2.jar:na]
    at com.mongodb.internal.connection.ClusterClockAdvancingSessionContext.advanceOperationTime(ClusterClockAdvancingSessionContext.java:76) ~[mongodb-driver-core-3.11.2.jar:na]
    at com.mongodb.internal.connection.InternalStreamConnection.updateSessionContext(InternalStreamConnection.java:537) ~[mongodb-driver-core-3.11.2.jar:na]
    at com.mongodb.internal.connection.InternalStreamConnection.access$800(InternalStreamConnection.java:76) ~[mongodb-driver-core-3.11.2.jar:na]
    at com.mongodb.internal.connection.InternalStreamConnection$2$1.onResult(InternalStreamConnection.java:385) ~[mongodb-driver-core-3.11.2.jar:na]
    ... 31 common frames omitted

 

Знаете ли вы, как избежать возникновения этой ошибки ? Есть ли другой способ остановить этот поток ?

Спасибо!

Комментарии:

1. I'm able to stop it пожалуйста, обновите информацию о том, как вы его останавливаете.

2. это .take(Duration.ofSeconds(10)) заставляет часы останавливаться, я тоже пытался .timeout(Duration.ofSeconds(10)) , однако ошибка выше всегда выбрасывается

3. я не понимаю, take ничего не «останавливает», документация для take такова Take only the first N values from this Flux, if available. If n is zero, the source is subscribed to but immediately cancelled, then the operator completes. , что она берет 10 элементов из потока, если есть 10 элементов, если их нет, она отменит, поэтому она резко оборвет соединение с базой данных, и база данных выдаст эту ошибку. Чтобы отключиться от базы данных, вы должны закрыть соединение, например, позвонив MongoClient.close() , чтобы правильно закрыть соединение и освободить ресурсы.

4. Моя вина, я использовал плохой способ остановить часы. я думаю, что timeout () — лучшее решение. Так должен ли я закрыть соединение внутри моего doOnCancel ?

5. это один из способов сделать это, если таково ваше намерение.