#spring-messaging #rsocket #rsocket-java
#spring-обмен сообщениями #rsocket #rsocket-java
Вопрос:
Я пытаюсь понять правильную конфигурацию и схему использования LoadbalanceRSocketClient
в контексте приложения SpringBoot ( RSocketRequester
).
У меня есть два серверных сервера RSocket (SpringBoot, RSocket messaging), которые работают и настраиваются RSocketRequester
на стороне клиента следующим образом:
List<LoadbalanceTarget> servers = new ArrayList<>();
for (String url: backendUrls) {
HttpClient httpClient = HttpClient.create()
.baseUrl(url)
.secure(ssl ->
ssl.sslContext(SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE)));
servers.add(LoadbalanceTarget.from(url, WebsocketClientTransport.create(httpClient, url)));
}
// RSocketRequester.Builder is autowired by Spring boot
RSocketRequester requester = builder
.setupRoute("/connect")
.setupData("test")
//.rsocketConnector(connector -> connector.reconnect(Retry.fixedDelay(60, Duration.ofSeconds(1))))
.transports(Flux.just(servers), new RoundRobinLoadbalanceStrategy());
После настройки запрашивающий повторно используется в цикле таймера, как показано ниже:
@Scheduled(fixedDelay = 10000, initialDelay = 1000)
public void timer() {
requester.route("/foo").data(Data).send().block();
}
Это работает — клиент запускается, подключается к одному из серверов и отправляет на него сообщения. Если я отключу сервер, к которому подключены клиенты, клиент повторно подключится к другому серверу при следующем событии таймера. Если я снова запускаю первый сервер и отключаю второй, клиент больше не подключается, и на стороне клиента наблюдается следующее исключение:
java.util.concurrent.CancellationException: Pool is exhausted
at io.rsocket.loadbalance.RSocketPool.select(RSocketPool.java:202) ~[rsocket-core-1.1.0.jar:na]
at io.rsocket.loadbalance.LoadbalanceRSocketClient.lambda$fireAndForget$0(LoadbalanceRSocketClient.java:49) ~[rsocket-core-1.1.0.jar:na]
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:125) ~[reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107) ~[reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107) ~[reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:220) ~[reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1784) ~[reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.MonoZip$ZipCoordinator.signal(MonoZip.java:251) ~[reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.MonoZip$ZipInner.onNext(MonoZip.java:336) ~[reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1784) ~[reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.MonoCallable.subscribe(MonoCallable.java:61) ~[reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.Mono.subscribe(Mono.java:3987) ~[reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.MonoZip.subscribe(MonoZip.java:128) ~[reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.Mono.subscribe(Mono.java:3987) ~[reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.Mono.block(Mono.java:1678) ~[reactor-core-3.4.0.jar:3.4.0]
Я подозреваю, что я либо неправильно настраиваю запрашивающий, либо неправильно его использую. Был бы признателен за любые подсказки, поскольку документация и тесты, похоже, довольно скудны в этой области.
В идеале я бы хотел, чтобы клиент прозрачно переключался на любой следующий доступный сервер при сбое сервера / подключения. Прямо сейчас попытка повторного подключения, похоже, происходит только при следующем вызове timer()
метода, что не идеально, поскольку клиенту необходимо обрабатывать входящие сообщения с сервера. Еще одна вещь, которую я заметил, это то, что даже в этом случае это "/foo"
маршрут FnF, если я не сделаю block()
это после send()
того, как сервер никогда не получит вызов.
Ответ №1:
Постоянно обновляйте список конечных точек
LoadbalanceClient
предназначен для интеграции со службой обнаружения, которая отвечает за поддержание List
a в Instance
рабочем состоянии. Тем не менее, если одна из служб исчезает из кластера, служба обнаружения обновляет свои List
доступные Instance
службы.
С другой стороны, для реализации выравнивания нагрузки на стороне клиента мы должны знать список доступных сервисов в кластере. Очевидно, что для настройки балансировки нагрузки мы можем получить список сервисов и передать его в API Loadbalancer.
ReactiveDiscoveryClient discoveryClient = ...
Mono<List<LoadbalanceTarget>> serversMono = discoveryClient
.getInstances(serviceGroupName)
.map(si -> {
HttpClient httpClient = HttpClient.create()
.baseUrl(si.getUri())
.secure(ssl -> ssl.sslContext(
SslContextBuilder.forClient()
.trustManager(InsecureTrustManagerFactory.INSTANCE)
));
return LoadbalanceTarget.from(si.getUri(), WebsocketClientTransport.create(httpClient, "/rsocket")));
})
.collectList()
// RSocketRequester.Builder is autowired by Spring boot
RSocketRequester requester = builder
.setupRoute("/connect")
.setupData("test")
.transports(serversMono.flux(), new RoundRobinLoadbalanceStrategy());
Однако представьте, что мы находимся в полностью распределенной среде, и теперь каждая служба, которая исчезает и появляется снова, запускается на абсолютно новом хосте и порту (например, кластер kubernates, который не привязан к определенному IP-адресу). Тем не менее, балансировка нагрузки должна учитывать такой сценарий и, чтобы избежать мертвых узлов в пуле, она полностью удаляет неработоспособные узлы из пула.
Теперь, если все узлы исчезли и появились через некоторое время, они больше не включаются в пул (и если Flux
, который предоставляет обновления, завершен, фактически, пул исчерпан, потому что новое обновление не поступит из Flux<List<LodbalanceTarget>>
).
Однако узлы регистрируются в службе обнаружения и становятся доступными для наблюдения. Все это говорит о том, что мы должны периодически извлекать информацию из службы обнаружения, чтобы быть актуальными и постоянно обновлять состояние пула
ReactiveDiscoveryClient discoveryClient = ...
Flux<List<LoadbalanceTarget>> serversFlux = discoveryClient
.getInstances(serviceGroupName)
.map(si -> {
HttpClient httpClient = HttpClient.create()
.baseUrl(si.getUri())
.secure(ssl -> ssl.sslContext(
SslContextBuilder.forClient()
.trustManager(InsecureTrustManagerFactory.INSTANCE)
));
return LoadbalanceTarget.from(si.getUri(), WebsocketClientTransport.create(httpClient, "/rsocket")));
})
.collectList()
.repeatWhen(f -> f.delayElements(Duration.ofSeconds(1))) // <- continuously retrieve new List of ServiceInstances
// RSocketRequester.Builder is autowired by Spring boot
RSocketRequester requester = builder
.setupRoute("/connect")
.setupData("test")
.transports(servers, new RoundRobinLoadbalanceStrategy());
При такой настройке RSocketPool
не будет исчерпан, если все узлы исчезнут из кластера, потому Flux<List<LoadbalanceTraget>>
что он еще не завершен и может в конечном итоге предоставить новые обновления.
Обратите внимание, реализация достаточно умна, чтобы поддерживать активные узлы при каждом обновлении из службы обнаружения. Тем не менее, если в пуле есть такой экземпляр службы, вы не получите 2 соединения одновременно.
Примечание о функции повторного подключения
Вы можете заметить, что RSocketConnector
это обеспечивает такую замечательную функцию под названием .reconnect
. На первый взгляд может показаться, что использование reconnect
будет поддерживать ваше соединение в рабочем состоянии бесконечно. К сожалению, это не так. Эта .reconnect
функция предназначена для обеспечения Mono<RSocket>
возможности повторного использования с семантикой кэша, что означает, что вы можете создавать @Bean Mono<RSocket> ...
и автоматически подключать его в разных местах и subscribe
несколько раз, не беспокоясь о том, что результат RSocket instance
будет разным для каждого Mono<RSocket>.subscribe
. С другой стороны .reconnect
, если given RSocket
отключается (например, в случае потери соединения), следующая подписка на такую подписку Mono<RSocket>
будет противостоять новой RSocket
только один раз для всех одновременных .subscribe
вызовов.
Хотя это звучит как полезная функция, в RSocketPool
мы не слишком полагаемся на нее и используем Mono<RSocket>
только один раз для разрешения и кэширования экземпляра RSocket внутри RSocketPool. Тем не менее, если такой RSocket будет отключен, мы не будем пытаться подписаться на данный Mono<RSocket>
снова (мы предполагаем, что настроенный хост и порт будут изменены)
Комментарии:
1. Спасибо за очень проницательный ответ, Олег.
Ответ №2:
Что касается вопроса о FnF, это часть модели Rx. Без подписки событие не происходит. Вы можете вызвать API, возвращающий Mono без побочных эффектов перед подписью, любое другое поведение является ошибкой.
/**
* Perform a Fire-and-Forget interaction via {@link RSocket#fireAndForget(Payload)}. Allows
* multiple subscriptions and performs a request per subscriber.
*/
Mono<Void> fireAndForget(Mono<Payload> payloadMono);
Если вы вызовете этот метод один раз, а затем подпишитесь 3 раза на результат, он выполнит его 3 раза.
Ответ №3:
Олег, я попробовал то, что ты предложил, и это в какой-то степени работает, хотя я все еще не могу добиться нужного мне поведения.
Что я хочу сделать, так это:
- Клиент подключается к одному (случайному) бэкэнду одновременно
- Если серверная часть или подключение к серверной части не удается, клиент должен попытаться подключиться к следующей доступной серверной части.
Я думаю, я не могу использовать RoundRobinLoadbalanceStrategy
, поскольку он соединяет клиента со всеми доступными бэкэндами. Должен ли я использовать WeightedLoadbalanceStrategy
вместо этого? Или discoveryClient
абстракция должна каждый раз возвращать только один сервер, но это больше не будет клиентом «пула», верно?
Возможно, мне следует переосмыслить подход в целом. У меня есть несколько десятков тысяч клиентов, поэтому я хочу сбалансировать нагрузку на серверную часть — распределить ее по нескольким экземплярам серверной части, чтобы каждый клиент случайным образом подключался к одному экземпляру серверной части, но мог повторно подключаться к другому экземпляру, если экземпляр, к которому он подключен, терпит неудачу. Я предполагаю, что это не очень хорошая идея для одновременного подключения всех клиентов к каждому экземпляру серверной части, но, может быть, я ошибаюсь?
Комментарии:
1. 1. Похоже, вам нужно одно соединение за раз, а затем вернуться к другому (балансировка нагрузки предназначена для нескольких соединений и распределения нагрузки между ними) 2. Возможно, вы захотите использовать
WeightedLoadbalanceStrategy
, он будет продолжать отправлять сообщения на одно и то же соединение, если задержка хорошая 3. Добро пожаловать в чат gitter, если вы хотите больше общаться по теме -> gitter.im/rsocket /…