Правильное использование LoadbalanceRSocketClient с RSocketRequester Spring

#spring-messaging #rsocket #rsocket-java

#spring-обмен сообщениями #rsocket #rsocket-java

Вопрос:

Я пытаюсь понять правильную конфигурацию и схему использования LoadbalanceRSocketClient в контексте приложения SpringBoot ( RSocketRequester ).

У меня есть два серверных сервера RSocket (SpringBoot, RSocket messaging), которые работают и настраиваются RSocketRequester на стороне клиента следующим образом:

 List<LoadbalanceTarget> servers = new ArrayList<>();
for (String url: backendUrls) {
  HttpClient httpClient = HttpClient.create()
    .baseUrl(url)
    .secure(ssl -> 
       ssl.sslContext(SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE)));
  servers.add(LoadbalanceTarget.from(url, WebsocketClientTransport.create(httpClient, url)));
}

// RSocketRequester.Builder is autowired by Spring boot
RSocketRequester requester = builder
  .setupRoute("/connect")
  .setupData("test")
  //.rsocketConnector(connector -> connector.reconnect(Retry.fixedDelay(60, Duration.ofSeconds(1))))
 .transports(Flux.just(servers), new RoundRobinLoadbalanceStrategy());   
 

После настройки запрашивающий повторно используется в цикле таймера, как показано ниже:

 @Scheduled(fixedDelay = 10000, initialDelay = 1000)
public void timer() {
  requester.route("/foo").data(Data).send().block();
}
 

Это работает — клиент запускается, подключается к одному из серверов и отправляет на него сообщения. Если я отключу сервер, к которому подключены клиенты, клиент повторно подключится к другому серверу при следующем событии таймера. Если я снова запускаю первый сервер и отключаю второй, клиент больше не подключается, и на стороне клиента наблюдается следующее исключение:

 java.util.concurrent.CancellationException: Pool is exhausted
    at io.rsocket.loadbalance.RSocketPool.select(RSocketPool.java:202) ~[rsocket-core-1.1.0.jar:na]
    at io.rsocket.loadbalance.LoadbalanceRSocketClient.lambda$fireAndForget$0(LoadbalanceRSocketClient.java:49) ~[rsocket-core-1.1.0.jar:na]
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:125) ~[reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107) ~[reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107) ~[reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:220) ~[reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1784) ~[reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.MonoZip$ZipCoordinator.signal(MonoZip.java:251) ~[reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.MonoZip$ZipInner.onNext(MonoZip.java:336) ~[reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1784) ~[reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.MonoCallable.subscribe(MonoCallable.java:61) ~[reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.Mono.subscribe(Mono.java:3987) ~[reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.MonoZip.subscribe(MonoZip.java:128) ~[reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.Mono.subscribe(Mono.java:3987) ~[reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.Mono.block(Mono.java:1678) ~[reactor-core-3.4.0.jar:3.4.0]
 

Я подозреваю, что я либо неправильно настраиваю запрашивающий, либо неправильно его использую. Был бы признателен за любые подсказки, поскольку документация и тесты, похоже, довольно скудны в этой области.

В идеале я бы хотел, чтобы клиент прозрачно переключался на любой следующий доступный сервер при сбое сервера / подключения. Прямо сейчас попытка повторного подключения, похоже, происходит только при следующем вызове timer() метода, что не идеально, поскольку клиенту необходимо обрабатывать входящие сообщения с сервера. Еще одна вещь, которую я заметил, это то, что даже в этом случае это "/foo" маршрут FnF, если я не сделаю block() это после send() того, как сервер никогда не получит вызов.

Ответ №1:

Постоянно обновляйте список конечных точек

LoadbalanceClient предназначен для интеграции со службой обнаружения, которая отвечает за поддержание List a в Instance рабочем состоянии. Тем не менее, если одна из служб исчезает из кластера, служба обнаружения обновляет свои List доступные Instance службы.

С другой стороны, для реализации выравнивания нагрузки на стороне клиента мы должны знать список доступных сервисов в кластере. Очевидно, что для настройки балансировки нагрузки мы можем получить список сервисов и передать его в API Loadbalancer.

 ReactiveDiscoveryClient discoveryClient = ...

Mono<List<LoadbalanceTarget>> serversMono = discoveryClient
    .getInstances(serviceGroupName)
    .map(si -> {
        HttpClient httpClient = HttpClient.create()
          .baseUrl(si.getUri())
          .secure(ssl -> ssl.sslContext(
              SslContextBuilder.forClient()
                         .trustManager(InsecureTrustManagerFactory.INSTANCE)
          ));
        return LoadbalanceTarget.from(si.getUri(), WebsocketClientTransport.create(httpClient, "/rsocket")));
    })
    .collectList()

// RSocketRequester.Builder is autowired by Spring boot
RSocketRequester requester = builder
  .setupRoute("/connect")
  .setupData("test")
  .transports(serversMono.flux(), new RoundRobinLoadbalanceStrategy());   

 

Однако представьте, что мы находимся в полностью распределенной среде, и теперь каждая служба, которая исчезает и появляется снова, запускается на абсолютно новом хосте и порту (например, кластер kubernates, который не привязан к определенному IP-адресу). Тем не менее, балансировка нагрузки должна учитывать такой сценарий и, чтобы избежать мертвых узлов в пуле, она полностью удаляет неработоспособные узлы из пула.

Теперь, если все узлы исчезли и появились через некоторое время, они больше не включаются в пул (и если Flux , который предоставляет обновления, завершен, фактически, пул исчерпан, потому что новое обновление не поступит из Flux<List<LodbalanceTarget>> ).

Однако узлы регистрируются в службе обнаружения и становятся доступными для наблюдения. Все это говорит о том, что мы должны периодически извлекать информацию из службы обнаружения, чтобы быть актуальными и постоянно обновлять состояние пула

 ReactiveDiscoveryClient discoveryClient = ...

Flux<List<LoadbalanceTarget>> serversFlux = discoveryClient
    .getInstances(serviceGroupName)
    .map(si -> {
        HttpClient httpClient = HttpClient.create()
          .baseUrl(si.getUri())
          .secure(ssl -> ssl.sslContext(
              SslContextBuilder.forClient()
                         .trustManager(InsecureTrustManagerFactory.INSTANCE)
          ));
        return LoadbalanceTarget.from(si.getUri(), WebsocketClientTransport.create(httpClient, "/rsocket")));
    })
    .collectList()
    .repeatWhen(f -> f.delayElements(Duration.ofSeconds(1))) // <- continuously retrieve new List of ServiceInstances

// RSocketRequester.Builder is autowired by Spring boot
RSocketRequester requester = builder
  .setupRoute("/connect")
  .setupData("test")
  .transports(servers, new RoundRobinLoadbalanceStrategy());
 

При такой настройке RSocketPool не будет исчерпан, если все узлы исчезнут из кластера, потому Flux<List<LoadbalanceTraget>> что он еще не завершен и может в конечном итоге предоставить новые обновления.

Обратите внимание, реализация достаточно умна, чтобы поддерживать активные узлы при каждом обновлении из службы обнаружения. Тем не менее, если в пуле есть такой экземпляр службы, вы не получите 2 соединения одновременно.

Примечание о функции повторного подключения

Вы можете заметить, что RSocketConnector это обеспечивает такую замечательную функцию под названием .reconnect . На первый взгляд может показаться, что использование reconnect будет поддерживать ваше соединение в рабочем состоянии бесконечно. К сожалению, это не так. Эта .reconnect функция предназначена для обеспечения Mono<RSocket> возможности повторного использования с семантикой кэша, что означает, что вы можете создавать @Bean Mono<RSocket> ... и автоматически подключать его в разных местах и subscribe несколько раз, не беспокоясь о том, что результат RSocket instance будет разным для каждого Mono<RSocket>.subscribe . С другой стороны .reconnect , если given RSocket отключается (например, в случае потери соединения), следующая подписка на такую подписку Mono<RSocket> будет противостоять новой RSocket только один раз для всех одновременных .subscribe вызовов.

Хотя это звучит как полезная функция, в RSocketPool мы не слишком полагаемся на нее и используем Mono<RSocket> только один раз для разрешения и кэширования экземпляра RSocket внутри RSocketPool. Тем не менее, если такой RSocket будет отключен, мы не будем пытаться подписаться на данный Mono<RSocket> снова (мы предполагаем, что настроенный хост и порт будут изменены)

Комментарии:

1. Спасибо за очень проницательный ответ, Олег.

Ответ №2:

Что касается вопроса о FnF, это часть модели Rx. Без подписки событие не происходит. Вы можете вызвать API, возвращающий Mono без побочных эффектов перед подписью, любое другое поведение является ошибкой.

   /**
   * Perform a Fire-and-Forget interaction via {@link RSocket#fireAndForget(Payload)}. Allows
   * multiple subscriptions and performs a request per subscriber.
   */
  Mono<Void> fireAndForget(Mono<Payload> payloadMono);
 

Если вы вызовете этот метод один раз, а затем подпишитесь 3 раза на результат, он выполнит его 3 раза.

Ответ №3:

Олег, я попробовал то, что ты предложил, и это в какой-то степени работает, хотя я все еще не могу добиться нужного мне поведения.

Что я хочу сделать, так это:

  • Клиент подключается к одному (случайному) бэкэнду одновременно
  • Если серверная часть или подключение к серверной части не удается, клиент должен попытаться подключиться к следующей доступной серверной части.

Я думаю, я не могу использовать RoundRobinLoadbalanceStrategy , поскольку он соединяет клиента со всеми доступными бэкэндами. Должен ли я использовать WeightedLoadbalanceStrategy вместо этого? Или discoveryClient абстракция должна каждый раз возвращать только один сервер, но это больше не будет клиентом «пула», верно?

Возможно, мне следует переосмыслить подход в целом. У меня есть несколько десятков тысяч клиентов, поэтому я хочу сбалансировать нагрузку на серверную часть — распределить ее по нескольким экземплярам серверной части, чтобы каждый клиент случайным образом подключался к одному экземпляру серверной части, но мог повторно подключаться к другому экземпляру, если экземпляр, к которому он подключен, терпит неудачу. Я предполагаю, что это не очень хорошая идея для одновременного подключения всех клиентов к каждому экземпляру серверной части, но, может быть, я ошибаюсь?

Комментарии:

1. 1. Похоже, вам нужно одно соединение за раз, а затем вернуться к другому (балансировка нагрузки предназначена для нескольких соединений и распределения нагрузки между ними) 2. Возможно, вы захотите использовать WeightedLoadbalanceStrategy , он будет продолжать отправлять сообщения на одно и то же соединение, если задержка хорошая 3. Добро пожаловать в чат gitter, если вы хотите больше общаться по теме -> gitter.im/rsocket /…