Spring reactive WebClient — как вызывать методы в Mono

#java #spring #reactive-programming #spring-webflux #spring-webclient

#java #spring #реактивное программирование #spring-webflux #spring-webclient

Вопрос:

Новичок в реактивном программировании и пытаюсь создать реактивный сервис с помощью WebFlux и WebClient.

Поток метода похож

  1. ОТПРАВИТЬ запрос и дождаться ответа
  2. Тело ответа на службу сопоставления (которая имеет другую бизнес-логику) и которая возвращает тип рекомендаций
  3. Создайте ResponseEntity
  4. Создайте Mono типа Mono<ResponseEntity>

Вопрос: является ли это допустимым способом сделать это, как если бы я использовал .exchange()? и есть ли способ объединить эти методы в цепочку вместо отдельных методов

текущая реализация:

 private Mono<ResponseEntity<Recommendations>> myMethod(final Request request, final String variantName) {

    Mono<String> response = webClient.build()
            .post()
            .uri(uri)
            .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
            .header(HttpHeaders.ACCEPT, MediaType.APPLICATION_JSON_VALUE)
            .bodyValue(requestBody)
            .retrieve().bodyToMono(String.class);

    var recommendations = ((XYZResponseMapper) responseMapper).mapReactive(request, response, useCaseId, variantName); //return type Recommendations
    var entity = new ResponseEntity<>(recommendations, nullHeaders, HttpStatus.OK);
    return Mono.just(entity);

}
  

Ответ №1:

Короткий ответ, вероятно, нет.

Более длинная версия заключается в том, что когда кто-то начинает подписываться на вашу службу (которая является производителем), клиент хочет использовать данные. Как только начнется подписка, webflux создаст реактивную цепочку внутри приложения. Эту цепочку можно сравнить с цепочкой обратного вызова и она называется «фазой сборки».

На этом этапе сборки важно, чтобы возвраты каждого потока / Mono были привязаны друг к другу. В противном случае вы разрываете цепочку.

 var firstAction = Mono.just("Hello").flatMap(value -> {
    // do something
});

// Next action needs to chain on the last
var secondAction = firstAction.flatMap(value -> {
    // Do the next thing
});

// Can be combined
var bothActions = Mono.just("Hello").flatMap(value -> {
    // do something
}).flatMap(value -> {
    // do next thing
});

  

В приведенном выше примере вы можете видеть, что я постоянно цепляюсь за последнее действие, мы не разрываем цепочку.

Теперь перейдем к вашему коду.

 private Mono<ResponseEntity<Recommendations>> myMethod(final Request request, final String variantName) {

    // Here you have a response
    Mono<String> response = webClient.build()
            .post()
            .uri(uri)
            // Not needed content type will default to json
            .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
            // will also default to json
            .header(HttpHeaders.ACCEPT, MediaType.APPLICATION_JSON_VALUE)
            .bodyValue(requestBody)
            .retrieve().bodyToMono(String.class);

    // here you pass the response into a function, hence probably breaking the chain
    var recommendations = ((XYZResponseMapper) responseMapper).mapReactive(request, response, useCaseId, variantName); //return type Recommendations
    var entity = new ResponseEntity<>(recommendations, nullHeaders, HttpStatus.OK);

    // Here you are suddenly creating a new mono, which tells me you deffo broke the chain and need to recreate it by doing a mono#just
    return Mono.just(entity);

}
  

Итак, как нам это решить?

 private Mono<Recommendations> myMethod(final Request request, final String variantName) {
    // You should not build a webclient on each request, you should build it in a @Bean
    final Mono<XYZResponse> response = webClient.build()
            .post()
            .uri(uri)
            .bodyValue(requestBody)
            .retrieve()
            // Map into a class representation to uphold type safety
            .bodyToMono(XYZResponse.class);

    // if we need to do additional transformations we can flatMap and chain on
    final Mono<Recommendations> recommendations = response.flatMap(value -> {
        var recommendations = mapper.toRecommendations(value);
    });

    // No need to wrap it in a response webflux will do that for you automatically when you return it to the client
    return recommendations;
}
  

затем мы можем даже переписать его еще короче.

 private Mono<Recommendations> myMethod(final Request request, final String variantName) {
    return webClient.build()
            .post()
            .uri(uri)
            .bodyValue(requestBody)
            .retrieve()
            .bodyToMono(XYZResponse.class)
            .flatMap(value -> {
                var recommendations = mapper.toRecommendations(value);
            });
}
  

Обычно я пишу оператор return в первой строке, а затем пишу свою цепочку после этого.

если вы хотите оптимизировать его еще больше (и это обычно рекомендуется), вы создаете webclient в компоненте конфигурации, чтобы webclient создавался только один раз (при запуске сервера webflux), а затем мы повторно используем его в каждом запросе.

 @Configuration
public class ClientConfig {

    @Bean
    public WebClient webclient(WebClient.Builder webClient) {
        return webClient.baseUrl( ... )
                .build();
    }
}

@Component
public class RecommendationHandler {

    final private WebClient 

    @Autowire
    public RecommendationHandler(WebClient webClient) {
        this.webClient = webClient;
    }

    private Mono<Recommendations> getRecommendations(RequestBody requestBody) {
    return webClient
            .post()
            .bodyValue(requestBody)
            .retrieve()
            .bodyToMono(XYZResponse.class)
            .flatMap(value -> {
                var recommendations = mapper.toRecommendations(value);
            });
    }   
}
  

Что-то вроде этого. Это всего лишь пример, я не запускал его в IDE, я просто написал это из головы. Но это некоторые из вещей, о которых я подумал, в этом нет обработки ошибок, которая должна быть адресной.

Удачи

Ответ №2:

После долгого чтения и экспериментов мне удалось заставить его работать со следующим:

     return webClient.build()
            .post()
            .uri(uri)
            .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
            .header(HttpHeaders.ACCEPT, MediaType.APPLICATION_JSON_VALUE)
            .bodyValue(requestBody)
            .retrieve()
            .toEntity(String.class)
            .publishOn(Schedulers.boundedElastic())
            .map(x -> {
           
                var recs = processResponse(request, x.getBody(), useCaseId, variantName);
                return new ResponseEntity<GatewayRecommendations>(recs, x.getStatusCode());
            });