#java #spring-webflux #project-reactor #reactor
#java #spring-webflux #проект-реактор #реактор
Вопрос:
Выполняется ли doFinally в том же потоке? Будет ли приведенный ниже код блокировать основной поток?
mono
.map(fileName -> asyncDownloadFile(fileName, folderName))
.doFinally(v -> {
FileUtils.cleanDirectory(folderName); // this method is blocking
});
если да, то каков наилучший способ выполнить cleanDirectory в отдельном потоке в doFinally?
Ответ №1:
Оберните блокирующий вызов в a Runnable
и запустите его на отдельном thread
:
Runnable task = () -> {FileUtils.cleanDirectory(folderName)};
Mono<Object> cleanDirPromise = Mono.fromRunnable(task);
mono
.map(fileName -> asyncDownloadFile(fileName, folderName))
.doFinally(v -> {
cleanDirPromise.subscribeOn(Schedulers.parallel()).subscribe();
});
Примечание: по сути, это будет вызов «запустить и забыть», результат которого вас не будет волновать cleanDirPromise
.
Комментарии:
1.
Schedulers.parallel()
создает фиксированный пул рабочих потоков столько ядер, сколько запущенной системы.Schedulers.boundedElastic()
является ли лучший выборcreates new worker pools as needed and reuses idle ones. Worker pools that stay idle for too long (the default is 60s) are also disposed. ... This is a better choice for I/O blocking work. Schedulers.boundedElastic() is a handy way to give a blocking process its own thread so that it does not tie up other resources.
, взятый из документации projectreactor.io/docs/core/release/reference/#schedulers2.
Schedulers.parallel()
должен использоваться для задач с высокой интенсивностью процессора, которые должны выполнять вычисления на нескольких ядрах.3. и
subscribe
должен быть удален, нет причин подписываться в середине приложения.4. @ThomasAndolf doFinally не подписывается на внутренний поток. Внешняя подписка не имеет ничего общего с подпиской на внутренний поток. Что касается планировщика, это не было частью вопроса. Но да, любые задачи блокировки должны выполняться на elastic.
5. doFinally возвращает Mono, поэтому подписка не требуется projectreactor.io/docs/core/release/api/reactor/core/publisher /…
Ответ №2:
Для этой цели лучше использовать .then()
operator:
mono
.map(fileName -> asyncDownloadFile(fileName, folderName))
.then()
.flatMap(
Mono.fromRunnable(() -> FileUtils.cleanDirectory(folderName))
.subscribeOn(Schedulers.boundedElastic())
)
...
Оператор then()
гарантирует cleanDirectory
, что он будет выполняться после asyncDownloadFile
, также позволяет вам создавать один конвейер и обрабатывать ошибки.
Комментарии:
1. Вам также необходимо добавить свой обработчик в error, но лучше тогда запустить новую подписку в doFinally.