Повторное подключение RSocket не работает при промежуточном перезапуске сервера

#spring-boot #rsocket #spring-boot-rsocket

#весенняя загрузка #rsocket #spring-boot-rsocket

Вопрос:

Я пытаюсь протестировать функцию повторного подключения RSocket, но она не работает, когда между ними был отключен сервер и перезапущен.

Шаг 1: запуск клиента при отключенном сервере

 16:02:35.351 [main] DEBUG io.netty.channel.DefaultChannelId - -Dio.netty.processId: 55104 (auto-detected)
16:02:35.515 [main] DEBUG io.netty.channel.DefaultChannelId - -Dio.netty.machineId: 10:e7:c6:ff:fe:31:38:c0 (auto-detected)
16:02:35.560 [reactor-tcp-nio-2] DEBUG reactor.netty.resources.DefaultPooledConnectionProvider - [id: 0x7b01559f] Created a new pooled channel, now 1 active connections and 0 inactive connections
16:02:35.569 [reactor-tcp-nio-2] DEBUG reactor.netty.transport.TransportConfig - [id: 0x7b01559f] Initialized pipeline DefaultChannelPipeline{(reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
doAfterRetry ===>io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: localhost/127.0.0.1:7999
16:02:36.619 [reactor-tcp-nio-2] DEBUG reactor.netty.resources.DefaultPooledConnectionProvider - [id: 0x8a39ea36] Created a new pooled channel, now 1 active connections and 0 inactive connections
16:02:36.620 [reactor-tcp-nio-2] DEBUG reactor.netty.transport.TransportConfig - [id: 0x8a39ea36] Initialized pipeline DefaultChannelPipeline{(reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
doAfterRetry ===>io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: localhost/127.0.0.1:7999
16:02:37.625 [reactor-tcp-nio-2] DEBUG reactor.netty.resources.DefaultPooledConnectionProvider - [id: 0x9754e5dd] Created a new pooled channel, now 1 active connections and 0 inactive connections
16:02:37.625 [reactor-tcp-nio-2] DEBUG reactor.netty.transport.TransportConfig - [id: 0x9754e5dd] Initialized pipeline DefaultChannelPipeline{(reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
doAfterRetry ===>io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: localhost/127.0.0.1:7999
 

Шаг 2: Сервер запущен. Повторное подключение сработало

 16:08:31.359 [reactor-tcp-nio-2] DEBUG reactor.netty.resources.DefaultPooledConnectionProvider - [id: 0x67b9f3a1] Created a new pooled channel, now 1 active connections and 0 inactive connections
16:08:31.359 [reactor-tcp-nio-2] DEBUG reactor.netty.transport.TransportConfig - [id: 0x67b9f3a1] Initialized pipeline DefaultChannelPipeline{(reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
16:08:31.862 [reactor-tcp-nio-2] DEBUG reactor.netty.resources.DefaultPooledConnectionProvider - [id: 0x67b9f3a1, L:/127.0.0.1:57689 - R:localhost/127.0.0.1:7999] Registering pool release on close event for channel
16:08:31.863 [reactor-tcp-nio-2] DEBUG reactor.netty.resources.DefaultPooledConnectionProvider - [id: 0x67b9f3a1, L:/127.0.0.1:57689 - R:localhost/127.0.0.1:7999] Channel connected, now 1 active connections and 0 inactive connections
16:08:31.863 [reactor-tcp-nio-2] DEBUG reactor.netty.resources.DefaultPooledConnectionProvider - [id: 0x67b9f3a1, L:/127.0.0.1:57689 - R:localhost/127.0.0.1:7999] onStateChange(PooledConnection{channel=[id: 0x67b9f3a1, L:/127.0.0.1:57689 - R:localhost/127.0.0.1:7999]}, [connected])
16:08:31.865 [reactor-tcp-nio-2] DEBUG reactor.netty.resources.DefaultPooledConnectionProvider - [id: 0x67b9f3a1, L:/127.0.0.1:57689 - R:localhost/127.0.0.1:7999] onStateChange(ChannelOperations{PooledConnection{channel=[id: 0x67b9f3a1, L:/127.0.0.1:57689 - R:localhost/127.0.0.1:7999]}}, [configured])
16:08:31.891 [reactor-tcp-nio-2] DEBUG io.rsocket.FrameLogger - sending -> 
Frame => Stream ID: 0 Type: SETUP Flags: 0b0 Length: 75
Data:

16:08:31.957 [reactor-tcp-nio-2] DEBUG reactor.netty.channel.FluxReceive - [id: 0x67b9f3a1, L:/127.0.0.1:57689 - R:localhost/127.0.0.1:7999] FluxReceive{pending=0, cancelled=false, inboundDone=false, inboundError=null}: subscribing inbound receiver
16:08:31.969 [reactor-tcp-nio-2] DEBUG io.rsocket.FrameLogger - sending -> 
Frame => Stream ID: 1 Type: REQUEST_STREAM Flags: 0b100000000 Length: 57 InitialRequestN: 9223372036854775807
Metadata:

 

Шаг 3: сбил сервер

 16:10:10.993 [reactor-tcp-nio-2] DEBUG reactor.netty.resources.DefaultPooledConnectionProvider - [id: 0x67b9f3a1, L:/127.0.0.1:57689 ! R:localhost/127.0.0.1:7999] Channel closed, now 0 active connections and 0 inactive connections
null
16:10:10.999 [reactor-tcp-nio-2] DEBUG org.springframework.core.codec.CharSequenceEncoder - Writing "ClientName:1607847010998"
16:10:11.008 [reactor-tcp-nio-2] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.nio.channels.ClosedChannelException
Caused by: java.nio.channels.ClosedChannelException: null
16:10:11.008 [reactor-tcp-nio-2] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.nio.channels.ClosedChannelException
Caused by: java.nio.channels.ClosedChannelException: null
16:10:11.010 [reactor-tcp-nio-2] DEBUG reactor.netty.ReactorNetty - [id: 0x67b9f3a1, L:/127.0.0.1:57689 ! R:localhost/127.0.0.1:7999] Non Removed handler: RSocketLengthCodec, context: ChannelHandlerContext(RSocketLengthCodec, [id: 0x67b9f3a1, L:/127.0.0.1:57689 ! R:localhost/127.0.0.1:7999]), pipeline: DefaultChannelPipeline{(RSocketLengthCodec = io.rsocket.transport.netty.RSocketLengthCodec), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
16:10:11.010 [reactor-tcp-nio-2] DEBUG reactor.netty.resources.DefaultPooledConnectionProvider - [id: 0x67b9f3a1, L:/127.0.0.1:57689 ! R:localhost/127.0.0.1:7999] onStateChange(ChannelOperations{PooledConnection{channel=[id: 0x67b9f3a1, L:/127.0.0.1:57689 ! R:localhost/127.0.0.1:7999]}}, [disconnecting])
 

RSocket не подключался повторно после вышеуказанного шага. Ниже приведена моя программа. Может кто-нибудь, пожалуйста, помочь просмотреть и посоветовать, что с этим не так.

         RSocketStrategies strategies = RSocketStrategies.builder()
                .encoders(e -> e.add(new Jackson2JsonEncoder()))
                .decoders(e -> e.add(new Jackson2JsonDecoder()))
                .build();

        RSocketRequester r = RSocketRequester.builder()
                .rsocketConnector(connector ->
                        connector.reconnect(Retry.indefinitely().doAfterRetry(e->  System.out.println("doAfterRetry ===>" e.failure())))
                ).dataMimeType(MediaType.APPLICATION_JSON)
                .rsocketStrategies(strategies)
                .tcp("localhost", 7999);

 

Обновлено 15 / декабрь

Следующий код отправляет запрос после шага 2. После отключения он не смог возобновить поток. Я уверен, что мне чего-то не хватает в моем коде. Пожалуйста, помогите

 requester.route("route_name")
                .data("RequestData")
                .retrieveFlux(MyResponse.class)
                .doOnError(ex ->{
                    System.out.println("doOnError" ex);
                }).doOnCancel(()->{
                    System.out.println("doOnCancel");
                }).doOnComplete(()-> {
                    System.out.println("doOnCancel");
                })
                .subscribe(result -> {
                    System.out.println("===>" result);
                });
 

Комментарии:

1. Вам нужно показать код, который выполняет запросы.

2. @RossenStoyanchev Я обновил свой пост. Пожалуйста, проверьте

Ответ №1:

Метод повторного подключения имеет обширный Javadoc. Основная цель этой функции — установить одно общее соединение, независимо от того, сколько подписчиков может быть одновременно:

Когда это включено, методы connect этого класса возвращают специальный Mono, который поддерживает один общий RSocket

Retry Определяет, как долго продолжать попытки подключиться, прежде чем отказаться, но как только он прекратит попытки или как только соединение будет установлено, а затем потеряно, он не будет автоматически пытаться подключиться снова.

нижестоящим абонентам для отдельных запросов по-прежнему требуется собственная логика повторных попыток, чтобы определить, следует ли повторять неудачные запросы, что, в свою очередь, запускает совместное повторное подключение

Таким образом, потребности каждого отдельного запроса определяют, следует ли вообще пытаться подключиться снова, в то время Retry как заданный to reconnect определяет логику повторных попыток для каждого общего повторного подключения.

Не забудьте также просмотреть соответствующие фрагменты кода в Javadoc.

Комментарии:

1. Спасибо Россену. Позвольте мне взглянуть на документы Java, соответствующие методам. Кстати, есть ли какое-либо событие обработчика (doOnxxx) в RSocket, которое уведомляет об успешном подключении?

2. ничего нового, потому что у вас похожая проблема?