Весенний поток: Правильно улавливать исключения в Моно.отложить

#spring-boot #spring-webflux

Вопрос:

Это мой код:

 Flux.fromStream(ids.stream())
    .flatMap(id -> Mono.defer(() -> {
        String entity = myRepository.findById(id);
        return Mono.just(entity);
    })
)
.collectList()
 

Я хотел бы знать, как правильно улавливать каждое исключение, в которое может быть введено myRepository.findById .

Что о:

 Scheduler scheduler = Schedulers.fromExecutor(...);
Flux.fromStream(ids.stream())
    .flatMap(id -> Mono.defer(() -> {
        String entity = myRepository.findById(id);
        return Mono.just(entity);
    }).subscribeOn(scheduler),
    10
)
.collectList()
.block();
 

Комментарии:

1. Не могли бы вы объяснить причину использования Mono#defer в этой ситуации? Но чтобы ответить, вы заключаете свой вызов в try catch и возвращаете либо значение, либо ошибку Mono.в предложении catch.

2. Что вы подразумеваете под «правильно улавливать каждое исключение»? Вы можете оставить код как есть, и любое возникающее исключение будет просто распространено при block() вызове, который завершит поток, или, если вы хотите подавить исключения и продолжить реактивный поток, вы можете обернуть код flatMap в try / catch. Этот код выглядит ужасно подозрительно, хотя это ненормально, что вы хотите обернуть блокирующий код в реактивные конструкции, подобные этой.

3. Мне нужно параллельно обрабатывать каждый идентификатор (получить сущность), чтобы повысить производительность. Проблема возникает, когда два Mono.defer(...) вызывают исключение «параллельно»… Должен ли я использовать Mono.error() ?

4. До сих пор не ясно, какова ваша цель при обработке ошибок. Вы хотите подавить их всех? Вы хотите, чтобы они выскочили и прервали поток, как только произойдет первая ошибка? Вы хотите попробовать выполнить их все и выдать ошибку только в конце, если таковые были?

5. я не думаю, что вам нужно использовать отсрочку внутри flatmap, вы можете использовать .onErrorMap() функцию для обработки ошибок, а также, как только ошибка распространится, она разорвет восходящую цепочку, и ошибка распространится на нисходящую