2 способа потоковой передачи с помощью spring webflux

#spring #spring-webflux #project-reactor #http2

Вопрос:

Мне интересно, можно ли добиться потоковой передачи 2 способами с помощью Spring Webflux? В принципе, я хочу, чтобы клиент отправлял поток данных, которые получает сервер, сопоставлял их со строкой, а затем возвращал результат, и все это плавно, без необходимости собирать данные. Я сделал это с помощью RSocket, но мне интересно, смогу ли я получить тот же результат, используя http 2.0 (с Spring и Project-Reactor).

Пробовал делать вот так:

1 — Клиент:

   public Mono<Void> stream() {
    var input = Flux.range(1, 10).delayElements(Duration.ofMillis(500));
    return stockWebClient.post()
            .uri("/stream")
            .body(BodyInserters.fromPublisher(input, Integer.class))
            .accept(MediaType.TEXT_EVENT_STREAM)
            .retrieve()
            .bodyToFlux(String.class)
            .log()
            .then();
  }
 

2 — Сервер:

 @PostMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
  public Flux<String> stream(@RequestBody Integer i) {
    return Flux.range(i, i 10).map(n -> String.valueOf(i)).log();
  }
 

Или:

 public Flux<String> stream(@RequestBody Flux<Integer> i) {
    return i.map(n -> String.valueOf(i)).log();
  }
 

Или:

 @PostMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
  public Flux<String> stream(@RequestBody List<Integer> i) {
    return Flux.fromIterable(i).map(n -> String.valueOf(i)).log();
  }
 

Ни один из них не работал правильно.

Комментарии:

1. вы можете сделать это с помощью websockets

Ответ №1:

Если вы хотите использовать отправленное сервером событие, вам нужно вернуть a Flux<ServerSentEvent<String>> .

Таким образом, ваш сервер мертод должен быть:

     @PostMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent<String>> stream(@RequestBody Integer i) {
        return Flux.range(i, i   10).map(n -> ServerSentEvent.builder(String.valueOf(n)).build());
    }
 

Но в этом случае тело является только целым числом, и ваш клиентский код становится:

 input.flatMap(i ->
        stockWebClient
                .post()
                .uri("/stream")
                .bodyValue(i)
                .accept(MediaType.TEXT_EVENT_STREAM)
                .retrieve()
                .bodyToFlux(new ParameterizedTypeReference<ServerSentEvent<String>>() {})
                .mapNotNull(ServerSentEvent::data)
                .log())
                .blockLast();
 

Вы также можете сделать то же самое с функциональной конечной точкой.

Если вы хотите иметь возможность передавать данные с клиента на сервер и обратно, вы не сможете использовать SSE, но вы можете добиться этого с помощью websocket.

Вам понадобится HandlerMapping и WebSocketHandler

 public class TestWebSocketHandler implements WebSocketHandler {
    @Override
    public Mono<Void> handle(WebSocketSession session) {
        Flux<WebSocketMessage> output = session.receive()
                .map(WebSocketMessage::getPayloadAsText)
                .map(Integer::parseInt)
                .concatMap(i -> Flux.range(i, i   10).map(String::valueOf))
                .map(session::textMessage);
        return session.send(output);
    }
}
 

Конфигурация с обработчиком :

 @Bean
    public TestWebSocketHandler myHandler() {
        return new TestWebSocketHandler();
    }

    @Bean
    public HandlerMapping handlerMapping(final TestWebSocketHandler myHandler) {
        Map<String, WebSocketHandler> map = new HashMap<>();
        map.put("/streamSocket", myHandler);
        int order = -1; // before annotated controllers
        return new SimpleUrlHandlerMapping(map, order);
    }
 

На стороне клиента:

 var input2 = Flux.range(1, 10).delayElements(Duration.ofMillis(500));
        WebSocketClient client = new ReactorNettyWebSocketClient();
        client.execute(URI.create("http://localhost:8080/streamSocket"), session ->
                session.send(input2.map(i -> session.textMessage(""   i))).then(session.receive().map(WebSocketMessage::getPayloadAsText).log().then())
        ).block();