Как обрабатывать исключения для Mono.fromCompletionStage

#java #project-reactor

Вопрос:

Мне нужно преобразовать этап завершения, возвращаемый внешней библиотекой, в Моно внутри моего реактивного конвейера. Как мне обрабатывать исключения, возвращенные из вызова (я хочу проигнорировать их и продолжить последовательность)? Операторы onErrorResume / doOn* не вызываются, когда внешний вызов создает исключение (возможно, потому, что Mono никогда не создается из-за исключения).

 private void example() {
    Flux.range(1, 2)
            .flatMap(i ->
                    Mono.fromCompletionStage(externalCall(i))
                            .doOnNext(ni -> System.out.println("onNext: "   ni))
                            .doOnError(err -> System.err.println("onError: "   err.getMessage()))
                            .onErrorResume(e -> Mono.empty())
            )
            .subscribe();

    try {
        Thread.sleep(10000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

CompletionStage<String> externalCall(int i) {
    if (new Random().nextBoolean()){
        throw new RuntimeException("Exception in external call");
    }

    return Mono.just(i)
            .map(e -> String.valueOf((char) (e   64)))
            .toFuture();
}
 

Трассировка стека

 2021-10-11T02:16:09,944 main r.c.p.Operators - Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.RuntimeException: Exception in external call
Caused by: java.lang.RuntimeException: Exception in external call
    at lrn.chap.SubscribingInFlatMap.externalCall(SubscribingInFlatMap.java:107)
    at lrn.chap.SubscribingInFlatMap.lambda$example$10(SubscribingInFlatMap.java:91)
    at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:386)
    at reactor.core.publisher.FluxRange$RangeSubscription.slowPath(FluxRange.java:156)
    at reactor.core.publisher.FluxRange$RangeSubscription.request(FluxRange.java:111)
    at reactor.core.publisher.FluxFlatMap$FlatMapMain.onSubscribe(FluxFlatMap.java:371)
    at reactor.core.publisher.FluxRange.subscribe(FluxRange.java:69)
    at reactor.core.publisher.Flux.subscribe(Flux.java:8468)
    at reactor.core.publisher.Flux.subscribeWith(Flux.java:8641)
    at reactor.core.publisher.Flux.subscribe(Flux.java:8438)

Process finished with exit code 0
 

Ответ №1:

Вы можете использовать Mono.defer() его для преобразования в ленивого издателя, как это:

 Mono.defer(() -> Mono.fromCompletionStage(externalCall(i)))
    .doOnNext(ni -> System.out.println("onNext: "   ni))
    .doOnError(err -> System.err.println("onError: "   err.getMessage()))
    .onErrorResume(e -> Mono.empty()))
 

Ответ №2:

defer Метод, упомянутый в другом ответе, является хорошим вариантом, но есть еще один более короткий вариант, который использует лямбда-версию fromCompletionStage метода:

 Mono.fromCompletionStage(() -> externalCall(i))
    .doOnNext(ni -> System.out.println("onNext: "   ni))
    .doOnError(err -> System.err.println("onError: "   err.getMessage()))
    .onErrorResume(e -> Mono.empty()))
 

Точно так defer же , это гарантирует, что externalCall метод вызывается по требованию, а ошибки обрабатываются как часть реактивной цепочки.

Комментарии:

1. Я действительно разрываюсь между принятием вашего ответа и другого (отложить), так как оба работают нормально. Я собираюсь принять ответ Mono.defer (), так как я чувствую, что это делает намерение немного более явным. Спасибо!