#java #project-reactor
Вопрос:
Мне нужно преобразовать этап завершения, возвращаемый внешней библиотекой, в Моно внутри моего реактивного конвейера. Как мне обрабатывать исключения, возвращенные из вызова (я хочу проигнорировать их и продолжить последовательность)? Операторы onErrorResume / doOn* не вызываются, когда внешний вызов создает исключение (возможно, потому, что Mono никогда не создается из-за исключения).
private void example() {
Flux.range(1, 2)
.flatMap(i ->
Mono.fromCompletionStage(externalCall(i))
.doOnNext(ni -> System.out.println("onNext: " ni))
.doOnError(err -> System.err.println("onError: " err.getMessage()))
.onErrorResume(e -> Mono.empty())
)
.subscribe();
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
CompletionStage<String> externalCall(int i) {
if (new Random().nextBoolean()){
throw new RuntimeException("Exception in external call");
}
return Mono.just(i)
.map(e -> String.valueOf((char) (e 64)))
.toFuture();
}
Трассировка стека
2021-10-11T02:16:09,944 main r.c.p.Operators - Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.RuntimeException: Exception in external call
Caused by: java.lang.RuntimeException: Exception in external call
at lrn.chap.SubscribingInFlatMap.externalCall(SubscribingInFlatMap.java:107)
at lrn.chap.SubscribingInFlatMap.lambda$example$10(SubscribingInFlatMap.java:91)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:386)
at reactor.core.publisher.FluxRange$RangeSubscription.slowPath(FluxRange.java:156)
at reactor.core.publisher.FluxRange$RangeSubscription.request(FluxRange.java:111)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onSubscribe(FluxFlatMap.java:371)
at reactor.core.publisher.FluxRange.subscribe(FluxRange.java:69)
at reactor.core.publisher.Flux.subscribe(Flux.java:8468)
at reactor.core.publisher.Flux.subscribeWith(Flux.java:8641)
at reactor.core.publisher.Flux.subscribe(Flux.java:8438)
Process finished with exit code 0
Ответ №1:
Вы можете использовать Mono.defer()
его для преобразования в ленивого издателя, как это:
Mono.defer(() -> Mono.fromCompletionStage(externalCall(i)))
.doOnNext(ni -> System.out.println("onNext: " ni))
.doOnError(err -> System.err.println("onError: " err.getMessage()))
.onErrorResume(e -> Mono.empty()))
Ответ №2:
defer
Метод, упомянутый в другом ответе, является хорошим вариантом, но есть еще один более короткий вариант, который использует лямбда-версию fromCompletionStage
метода:
Mono.fromCompletionStage(() -> externalCall(i))
.doOnNext(ni -> System.out.println("onNext: " ni))
.doOnError(err -> System.err.println("onError: " err.getMessage()))
.onErrorResume(e -> Mono.empty()))
Точно так defer
же , это гарантирует, что externalCall
метод вызывается по требованию, а ошибки обрабатываются как часть реактивной цепочки.
Комментарии:
1. Я действительно разрываюсь между принятием вашего ответа и другого (отложить), так как оба работают нормально. Я собираюсь принять ответ Mono.defer (), так как я чувствую, что это делает намерение немного более явным. Спасибо!