#java #spring-boot #reactive-programming #spring-webflux #project-reactor
#java #весенняя загрузка #реактивное программирование #spring-webflux #проект-реактор
Вопрос:
У нас есть служба, которая медленно отвечает в среднем на 2 секунды, какую бы полезную нагрузку мы ни отправляли. Мы обрабатываем несколько запросов, чтобы сэкономить некоторую пропускную способность и не заглушить эту службу.
У меня есть этот клиент WebClient
для сбора и выполнения запросов.
public class MyClient {
private final WebClient client;
private final Sinks.Many<String> requestSink;
private final Flux<Map.Entry<String, TrackStatus>> output;
public MyClient(URI uri, int queueSize, Duration timeout) {
this.client = WebClient.create(uri.toString());
this.requestSink = Sinks.many().multicast().onBackpressureBuffer(100, false);
this.output = this.requestSink.asFlux().bufferTimeout(queueSize, timeout)
.publishOn(Schedulers.boundedElastic())
.flatMap(orderNumbers -> this.client.get()
.uri(uriBuilder -> uriBuilder
.path("/orders")
.queryParam("q", orderNumbers)
.build())
.retrieve()
.bodyToMono(Response.class)
.publishOn(Schedulers.boundedElastic())
.timeout(Duration.ofSeconds(3))
.onErrorReturn(this.createEmptyResponse(orderNumbers)))
.flatMap(shipmentsResponse -> Flux.fromIterable(shipmentsResponse.getStatuses().entrySet()));
}
private Response createEmptyResponse(List<String> orderNumbers) {
var response = new Response();
for (String orderNumber : orderNumbers) {
response.addStatus(orderNumber, null);
}
return response;
}
public void addOrderNumber(String orderNumber) {
var emitResult = this.requestSink.tryEmitNext(orderNumber);
while (emitResult.isFailure()) {
emitResult = this.requestSink.tryEmitNext(orderNumber);
}
}
public Flux<Map.Entry<String, TrackStatus>> output() {
return this.output;
}
}
public class Response {
private Map<String, Status> statuses = new HashMap<>();
@JsonAnySetter
public void addStatus(String orderNumber, Status status) {
this.statuses.put(orderNumber, status);
}
@JsonAnyGetter
public Map<String, Status> getStatuses() {
return this.statuses;
}
}
public enum Status {
NEW,
IN_TRANSIT,
DELIVERED;
}
Я использую этот клиент в бизнес-сервисе.
public class Service {
private final MyClient client;
public Service(MyClient client) {
this.client = client;
}
public Mono<Response> getStatus(Set<String> orderNumbers) {
var statuses = this.client.output()
.filter(e -> orderNumbers.contains(e.getKey()))
.take(orderNumbers.size())
.collect(this.nullSafeCollector());
Flux.fromIterable(orderNumbers)
.subscribeOn(Schedulers.boundedElastic())
.subscribe(this.client::addOrderNumber);
return statuses
.publishOn(Schedulers.boundedElastic())
.map(t -> new Response(t));
}
private <K, V> Collector<Map.Entry<K, V>, Map<K, V>, Map<K, V>> nullSafeCollector() {
return Collector.of(HashMap::new
, (m, entry) -> m.put(entry.getKey(), entry.getValue())
, (m1, m2) -> {
m1.putAll(m2);
return m1;
}
, Collector.Characteristics.UNORDERED);
}
}
public final class Response {
private final Map<String, Status> statuses;
public GetAggregationResponse(Map<String, Status> statuses) {
this.statuses = statuses;
}
public Map<String, Status> getStatuses() {
return this.statuses;
}
}
Эта бизнес-служба используется со следующего контроллера.
@RestController
@RequestMapping(value = "/collect", produces = APPLICATION_JSON_VALUE)
public class Controller {
private final Service service;
public Controller(Service service) {
this.service = service;
}
@GetMapping()
@ResponseStatus(value = OK)
private Mono<Response> getAggregation(@RequestParam(name = "order") Set<String> orderNumbers) {
return this.service.getStatus(orderNumbers);
}
}
Это работает без каких-либо проблем для одного запроса. Но когда я делаю одновременные несколько вызовов службы. Служба зависает и не возвращает ответа всем вызывающим абонентам.
Я запустил приложение с помощью BlockHound, чтобы посмотреть, что блокирует сервер, но с него нет вывода. Как я могу понять, что вызывает зависание?
Я использую Java 15 с Spring Boot 2.4.0, который имеет реактор 3.4.0.
Комментарии:
1. Вы проверили
Request
область действия вашего класса обслуживания? По умолчанию это такSingleton
.2.
Request
область @user370305 не поддерживается в spring-webflux.