#spring #spring-webflux #project-reactor #http2
Вопрос:
Мне интересно, можно ли добиться потоковой передачи 2 способами с помощью Spring Webflux? В принципе, я хочу, чтобы клиент отправлял поток данных, которые получает сервер, сопоставлял их со строкой, а затем возвращал результат, и все это плавно, без необходимости собирать данные. Я сделал это с помощью RSocket, но мне интересно, смогу ли я получить тот же результат, используя http 2.0 (с Spring и Project-Reactor).
Пробовал делать вот так:
1 — Клиент:
public Mono<Void> stream() {
var input = Flux.range(1, 10).delayElements(Duration.ofMillis(500));
return stockWebClient.post()
.uri("/stream")
.body(BodyInserters.fromPublisher(input, Integer.class))
.accept(MediaType.TEXT_EVENT_STREAM)
.retrieve()
.bodyToFlux(String.class)
.log()
.then();
}
2 — Сервер:
@PostMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> stream(@RequestBody Integer i) {
return Flux.range(i, i 10).map(n -> String.valueOf(i)).log();
}
Или:
public Flux<String> stream(@RequestBody Flux<Integer> i) {
return i.map(n -> String.valueOf(i)).log();
}
Или:
@PostMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> stream(@RequestBody List<Integer> i) {
return Flux.fromIterable(i).map(n -> String.valueOf(i)).log();
}
Ни один из них не работал правильно.
Комментарии:
1. вы можете сделать это с помощью websockets
Ответ №1:
Если вы хотите использовать отправленное сервером событие, вам нужно вернуть a Flux<ServerSentEvent<String>>
.
Таким образом, ваш сервер мертод должен быть:
@PostMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> stream(@RequestBody Integer i) {
return Flux.range(i, i 10).map(n -> ServerSentEvent.builder(String.valueOf(n)).build());
}
Но в этом случае тело является только целым числом, и ваш клиентский код становится:
input.flatMap(i ->
stockWebClient
.post()
.uri("/stream")
.bodyValue(i)
.accept(MediaType.TEXT_EVENT_STREAM)
.retrieve()
.bodyToFlux(new ParameterizedTypeReference<ServerSentEvent<String>>() {})
.mapNotNull(ServerSentEvent::data)
.log())
.blockLast();
Вы также можете сделать то же самое с функциональной конечной точкой.
Если вы хотите иметь возможность передавать данные с клиента на сервер и обратно, вы не сможете использовать SSE, но вы можете добиться этого с помощью websocket.
Вам понадобится HandlerMapping и WebSocketHandler
public class TestWebSocketHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
Flux<WebSocketMessage> output = session.receive()
.map(WebSocketMessage::getPayloadAsText)
.map(Integer::parseInt)
.concatMap(i -> Flux.range(i, i 10).map(String::valueOf))
.map(session::textMessage);
return session.send(output);
}
}
Конфигурация с обработчиком :
@Bean
public TestWebSocketHandler myHandler() {
return new TestWebSocketHandler();
}
@Bean
public HandlerMapping handlerMapping(final TestWebSocketHandler myHandler) {
Map<String, WebSocketHandler> map = new HashMap<>();
map.put("/streamSocket", myHandler);
int order = -1; // before annotated controllers
return new SimpleUrlHandlerMapping(map, order);
}
На стороне клиента:
var input2 = Flux.range(1, 10).delayElements(Duration.ofMillis(500));
WebSocketClient client = new ReactorNettyWebSocketClient();
client.execute(URI.create("http://localhost:8080/streamSocket"), session ->
session.send(input2.map(i -> session.textMessage("" i))).then(session.receive().map(WebSocketMessage::getPayloadAsText).log().then())
).block();