#netty #reactive-programming #spring-webflux #project-reactor
#netty #реактивное программирование #spring-webflux #проект-реактор
Вопрос:
Итак, я понимаю, что под капотом Spring WebFlux и Reactor используют netty для nio, теперь я хотел бы выпустить вызывающий поток, чтобы освободить ресурсы для обработки большего количества запросов. Является ли приведенный ниже простой код освобождением вызывающего потока?
@GetMapping("/foo")
public Mono<Void> bar() {
someService.veryLongSyncOperation();
return Mono.empty();
}
Я не заключал вызов службы в поток / Mono, я просто хочу сначала убедиться, что вызывающий поток освобожден, пока служба выполняет свою длительную работу. Достаточно ли этого для достижения освобождения вызывающего потока? Если да, то есть ли способ проверить это?
Я думал, что фреймворк видит возвращаемый тип, и этого достаточно, чтобы он знал, что он должен освободить вызывающий поток.
Ответ №1:
Вы могли бы использовать, .subscribeOn(Schedulers.elastic())
как указано в справочном руководстве по реактору
@GetMapping("/foo")
public Mono<Void> bar() {
return Mono.fromCallable(() -> someService.veryLongSyncOperation())
.subscribeOn(Schedulers.elastic())
.then();
}
каждая подписка будет выполняться на выделенном однопоточном рабочем из Schedulers.elastic().
Upd: Теперь есть Schedulers.boundedElastic()
планировщик. Я бы рекомендовал использовать его по умолчанию.
Комментарии:
1. Происходит ли это только внутри метода контроллера? У меня есть метод в приложении, который необходимо выполнить как новый поток, а основной поток должен просто вернуть
Ответ №2:
Нет. В этом случае вы вызываете долго выполняющийся процесс в потоке ввода-вывода Netty. Самый простой способ, который я могу придумать для этого, — создать моно-приемник и запустить длинную операцию в новом потоке (или, возможно, через пул потоков). Когда операция завершается успешно, вы вызываете sink.success()
, и если это не удается, вы вызываете, sink.error(x)
передавая созданное исключение.
@GetMapping("/foo")
public Mono<Void> bar() {
return Mono.create(sink -> {
new Thread(() -> {
try {
someService.veryLongSyncOperation();
sink.success();
} catch (Exception ex) {
sink.error(ex);
}
}).start();
});
}
Вызывающий поток возвращается сразу после настройки потока, и WebFlux подписывается на возвращенный Mono, который запускает поток для запуска в новом потоке.