#java #spring-webflux #project-reactor
#java #spring-webflux #проект-реактор
Вопрос:
Я загружаю данные с помощью цепочки реакторов:
public Flux<Report> collectReport(List<MarkId> marks) {
return Flux.fromIterable(marks)
.flatMap(this::prepareRequest)
.collectList()
.flatMapIterable(e -> e)
.delayElements(Duration.ofMillis(200))
.concatMap(this::createReport)
.retryWhen(retryConfig)
.onErrorResume(throwable -> {
log.error(throwable.getMessage());
return Mono.empty();
});
.flatMap(response -> some actions here..//)
.buffer(1000)
.publishOn(Schedulers.newParallel("The rep saving", 4))
.flatMap(googleAnalyticsReports -> {
//saving to database here
}
);
}
concatMap(this::createReport)
может привести к ошибкам, и повторная попытка будет выполнена. Таким образом, приложение будет остановлено и не будет отправлять другие запросы до тех пор, пока не будут исчерпаны повторные попытки для одного идентификатора.
Чтобы увеличить скорость загрузки, я решил заменить это concatMap
на flatMap
. Но flatMap
это не так предсказуемо. Теперь, если возникает ошибка, приложение продолжит отправлять запросы из других потоков, игнорируя тот факт, что некоторые идентификаторы уже находятся в retry
регистре, и при таком поведении я продолжу получать ошибку 429 с сервера (ограничения API) до того, как тайм-аут будет активен.
Итак, мой вопрос: как я могу прекратить загрузку данных из flatMap и подождать, пока все попытки не будут исчерпаны, если произошла какая-то конкретная ошибка для определенного идентификатора? Если я получаю ошибку 429, приложение должно прекратить загрузку данных для текущего идентификатора, затем переходит к повторной попытке, где я могу подождать, пока пройдет время ожидания. Пока этот идентификатор не передан, приложение не должно отправлять другие запросы на другие идентификаторы из списка.
Ответ №1:
То, что вы пытаетесь, невозможно flatMap
по той простой причине, что flatMap
он охотно подписывается на внутренние потоки, то есть он не будет ждать завершения одного потока, прежде чем он подпишется на следующий. Поскольку createReport является асинхронным, как только работа выгружается в I/O
поток, flatMap
создается новый поток из следующего элемента и подписывается на него. Сравните это с concatMap
ожиданием завершения внутреннего потока перед подпиской на следующий.
То, что вы хотите сделать, немного противоречит само по себе — вы хотите увеличить скорость загрузки (поэтому вы хотите распараллелить вызовы), но затем вы хотите упорядочить эти запросы. Вы хотите, чтобы запрос знал о статусе любого предыдущего запроса. flatMap
не предоставляет никаких гарантий упорядочения, поэтому вы не можете использовать этот оператор здесь.