Сервер блокировки одновременных вызовов

#java #spring-boot #reactive-programming #spring-webflux #project-reactor

#java #весенняя загрузка #реактивное программирование #spring-webflux #проект-реактор

Вопрос:

У нас есть служба, которая медленно отвечает в среднем на 2 секунды, какую бы полезную нагрузку мы ни отправляли. Мы обрабатываем несколько запросов, чтобы сэкономить некоторую пропускную способность и не заглушить эту службу.

У меня есть этот клиент WebClient для сбора и выполнения запросов.

 public class MyClient {
    private final WebClient client;
    private final Sinks.Many<String> requestSink;
    private final Flux<Map.Entry<String, TrackStatus>> output;

    public MyClient(URI uri, int queueSize, Duration timeout) {
        this.client = WebClient.create(uri.toString());
        this.requestSink = Sinks.many().multicast().onBackpressureBuffer(100, false);
        this.output = this.requestSink.asFlux().bufferTimeout(queueSize, timeout)
                .publishOn(Schedulers.boundedElastic())
                .flatMap(orderNumbers -> this.client.get()
                        .uri(uriBuilder -> uriBuilder
                                .path("/orders")
                                .queryParam("q", orderNumbers)
                                .build())
                        .retrieve()
                        .bodyToMono(Response.class)
                        .publishOn(Schedulers.boundedElastic())
                        .timeout(Duration.ofSeconds(3))
                        .onErrorReturn(this.createEmptyResponse(orderNumbers)))
                .flatMap(shipmentsResponse -> Flux.fromIterable(shipmentsResponse.getStatuses().entrySet()));
    }

    private Response createEmptyResponse(List<String> orderNumbers) {
        var response = new Response();

        for (String orderNumber : orderNumbers) {
            response.addStatus(orderNumber, null);
        }

        return response;
    }

    public void addOrderNumber(String orderNumber) {
        var emitResult = this.requestSink.tryEmitNext(orderNumber);

        while (emitResult.isFailure()) {
            emitResult = this.requestSink.tryEmitNext(orderNumber);
        }
    }

    public Flux<Map.Entry<String, TrackStatus>> output() {
        return this.output;
    }
}

public class Response {
    private Map<String, Status> statuses = new HashMap<>();

    @JsonAnySetter
    public void addStatus(String orderNumber, Status status) {
        this.statuses.put(orderNumber, status);
    }

    @JsonAnyGetter
    public Map<String, Status> getStatuses() {
        return this.statuses;
    }
}

public enum Status {
    NEW,
    IN_TRANSIT,
    DELIVERED;
}
 

Я использую этот клиент в бизнес-сервисе.

 public class Service  {

    private final MyClient client;

    public Service(MyClient client) {
        this.client = client;
    }

    public Mono<Response> getStatus(Set<String> orderNumbers) {
        var statuses = this.client.output()
                .filter(e -> orderNumbers.contains(e.getKey()))
                .take(orderNumbers.size())
                .collect(this.nullSafeCollector());

        Flux.fromIterable(orderNumbers)
                .subscribeOn(Schedulers.boundedElastic())
                .subscribe(this.client::addOrderNumber);

        return statuses
                .publishOn(Schedulers.boundedElastic())
                .map(t -> new Response(t));
    }

    private <K, V> Collector<Map.Entry<K, V>, Map<K, V>, Map<K, V>> nullSafeCollector() {
        return Collector.of(HashMap::new
                , (m, entry) -> m.put(entry.getKey(), entry.getValue())
                , (m1, m2) -> {
                    m1.putAll(m2);
                    return m1;
                }
                , Collector.Characteristics.UNORDERED);
    }
}

public final class Response {
    private final Map<String, Status> statuses;

    public GetAggregationResponse(Map<String, Status> statuses) {
        this.statuses = statuses;
    }

    public Map<String, Status> getStatuses() {
        return this.statuses;
    }
}

 

Эта бизнес-служба используется со следующего контроллера.

 @RestController
@RequestMapping(value = "/collect", produces = APPLICATION_JSON_VALUE)
public class Controller {

    private final Service service;

    public Controller(Service service) {
        this.service = service;
    }

    @GetMapping()
    @ResponseStatus(value = OK)
    private Mono<Response> getAggregation(@RequestParam(name = "order") Set<String> orderNumbers) {
        return this.service.getStatus(orderNumbers);
    }
}
 

Это работает без каких-либо проблем для одного запроса. Но когда я делаю одновременные несколько вызовов службы. Служба зависает и не возвращает ответа всем вызывающим абонентам.

Я запустил приложение с помощью BlockHound, чтобы посмотреть, что блокирует сервер, но с него нет вывода. Как я могу понять, что вызывает зависание?

Я использую Java 15 с Spring Boot 2.4.0, который имеет реактор 3.4.0.

Комментарии:

1. Вы проверили Request область действия вашего класса обслуживания? По умолчанию это так Singleton .

2. Request область @user370305 не поддерживается в spring-webflux.