Поток вызова веб-потока Spring Webflux / Reactor Release

#netty #reactive-programming #spring-webflux #project-reactor

#netty #реактивное программирование #spring-webflux #проект-реактор

Вопрос:

Итак, я понимаю, что под капотом Spring WebFlux и Reactor используют netty для nio, теперь я хотел бы выпустить вызывающий поток, чтобы освободить ресурсы для обработки большего количества запросов. Является ли приведенный ниже простой код освобождением вызывающего потока?

 @GetMapping("/foo")
public Mono<Void> bar() {

  someService.veryLongSyncOperation();

  return Mono.empty();
}
  

Я не заключал вызов службы в поток / Mono, я просто хочу сначала убедиться, что вызывающий поток освобожден, пока служба выполняет свою длительную работу. Достаточно ли этого для достижения освобождения вызывающего потока? Если да, то есть ли способ проверить это?

Я думал, что фреймворк видит возвращаемый тип, и этого достаточно, чтобы он знал, что он должен освободить вызывающий поток.

Ответ №1:

Вы могли бы использовать, .subscribeOn(Schedulers.elastic()) как указано в справочном руководстве по реактору

 @GetMapping("/foo")
public Mono<Void> bar() {
    return Mono.fromCallable(() -> someService.veryLongSyncOperation())
            .subscribeOn(Schedulers.elastic())
            .then();
}
  

каждая подписка будет выполняться на выделенном однопоточном рабочем из Schedulers.elastic().

Upd: Теперь есть Schedulers.boundedElastic() планировщик. Я бы рекомендовал использовать его по умолчанию.

Комментарии:

1. Происходит ли это только внутри метода контроллера? У меня есть метод в приложении, который необходимо выполнить как новый поток, а основной поток должен просто вернуть

Ответ №2:

Нет. В этом случае вы вызываете долго выполняющийся процесс в потоке ввода-вывода Netty. Самый простой способ, который я могу придумать для этого, — создать моно-приемник и запустить длинную операцию в новом потоке (или, возможно, через пул потоков). Когда операция завершается успешно, вы вызываете sink.success() , и если это не удается, вы вызываете, sink.error(x) передавая созданное исключение.

 @GetMapping("/foo")
public Mono<Void> bar() {
    return Mono.create(sink -> {
        new Thread(() -> {
            try {
                someService.veryLongSyncOperation();
                sink.success();
            } catch (Exception ex) {
                sink.error(ex);
            }
        }).start();
    });  
}
  

Вызывающий поток возвращается сразу после настройки потока, и WebFlux подписывается на возвращенный Mono, который запускает поток для запуска в новом потоке.