Spring Boot 2.4 RSocket повторно обнаруживает потерянное соединение

#spring #rsocket

#весна #rsocket

Вопрос:

Я подключаюсь с помощью RsocketClient к RsocketServer со следующей конфигурацией. Это работает хорошо. Я хотел бы обнаружить потерю соединения. Это хорошо работает при регистрации в OnClose Mono базового rsocket RsocketClient. Но только один раз. Как я могу обнаружить закрытие, если у меня повторные потери соединения и повторные подключения?

 requester = rsocketRequesterBuilder.setupRoute("shell-client").setupData(CLIENT_ID)
                        .setupMetadata(accessToken, BearerTokenMetadata.BEARER_AUTHENTICATION_MIME_TYPE)
                        .rsocketStrategies(rsocketStrategies).
                        rsocketConnector(connector -> connector.acceptor(responder).reconnect(Retry.backoff(1000, Duration.ofMillis(500)).doBeforeRetry(x->{log.info("Retry");
                        
                        
                        
                        }).doAfterRetry(x->{log.info(x.toString());})))
                        .tcp("x.y.z.40", 8080);
                
               
                RSocket r=requester.rsocketClient().source().block();
                r.onClose().doOnError(x->{
                    log.info("Error");}).doFinally(x->{
                    log.info("Disconnected");
                }).subscribe();
 

Ответ №1:

вы можете добиться требуемого поведения, используя следующий фрагмент кода:

  requester
       .rsocketClient()
       .source()
       .flatMap(rsocket -> rsocket.onClose())
       .repeat()
       .retry()
       .subscribe();
 

в приведенном выше примере, как только соединение будет потеряно, rsocket.onClose() будет отправлен сигнал терминала. Поскольку вы установили функцию повторного подключения, следующая подписка на RSocketClient.source() приведет к восстановлению соединения. Поэтому, как только это произойдет, вы получите новый экземпляр rsocket и снова подпишетесь на onClose поток внутри flatMap operator.

Чтобы повторить эту операцию, мы можем использовать .repeat and retry so всякий onClose раз, когда завершается (что является индикатором отключения), подписка на RSocketClient.source будет повторена, и вы сможете получить новое соединение и снова начать прослушивание onClose потока