Должны ли Flux.then и Mono.then вести себя по-разному в случае ошибки?

#kotlin #project-reactor

#kotlin #проект-реактор

Вопрос:

Я столкнулся со случаем, когда у меня есть вложенный поток. Меня не волнуют отдельные результаты внутреннего потока, поскольку он возвращает единицу (в Kotlin / Void в Java), но я хочу знать, прерван ли поток из-за ошибки или нет. Я думал, что смогу использовать then функцию, как указано в документе: Error signal is replayed in the resulting Mono<V>

Моя проблема может быть сведена к минимальному модульному тестированию (Kotlin):

     @Test
    fun fluxTest() {
        val flux = Flux.just("willFail", "willSucceed")
            .flatMap { outer ->
                // In my real world example the inner flux is created via Flux.fromIterable from a property of the
                // outer`-object
                Flux.just(1)
                    .flatMap { inner ->
                        // this simulates a Mono.fromSupplier that can throw exceptions
                        if (outer == "willFail") Mono.error<Unit>(RuntimeException("bam"))
                        else Mono.just(Unit)
                    }
                    // We don't care about the Flux as it returns Unit/Void
                    // All we want to know is, whether there was an error or not
                    .then(Mono.just(outer))
            }
            .onErrorContinue { error, item -> println("$item => $error") }
            .collectList()

        StepVerifier.create(flux)
            .expectNextMatches { it.size == 1 }
            .verifyComplete()
    }
  

Итак, у нас есть 2 элемента. Во внутреннем потоке один из элементов завершится ошибкой при обработке, а другой — нет. Я ожидаю, что ошибка будет распространяться по конвейеру, где она будет обнаружена и отброшена в onErrorContinue .
Поэтому я ожидаю, что в результирующем списке будет 1 элемент, но я получаю исходные 2. Я понятия не имею, почему.

Теперь начинается самое интересное: в этом конкретном тестовом примере я могу заменить Flux.just(1) на Mono.just(1) (в моем реальном случае это не работает ofc, потому что поток содержит более 1 элемента), и внезапно мой тест проходит:

     @Test
    fun fluxTest() {
        val flux = Flux.just("willFail", "willSucceed")
            .flatMap { outer ->
                // In my real world example the inner flux is created via Flux.fromIterable from a property of the
                // outer`-object
                Mono.just(1)
                    .flatMap { inner ->
                        // this simulates a Mono.fromSupplier that can throw exceptions
                        if (outer == "willFail") Mono.error<Unit>(RuntimeException("bam"))
                        else Mono.just(Unit)
                    }
                    // We don't care about the Flux as it returns Unit/Void
                    // All we want to know is, whether there was an error or not
                    .then(Mono.just(outer))
            }
            .onErrorContinue { error, item -> println("$item => $error") }
            .collectList()

        StepVerifier.create(flux)
            .expectNextMatches { it.size == 1 }
            .verifyComplete()
    }
  

Итак, очевидно, что есть разница в Mono.then(Mono<T>) и в Flux.then(Mono<T>) , но этого не должно быть, поскольку Javadoc одинаковый, верно?

Примечание сбоку: вместо Flux.then(Mono.just(outer)) я тоже пробовал Mono.defer , но это ничего не меняет.

Комментарии:

1. Это странно. Просматривая проблемы проекта, я нашел эту, которая может быть связана с вашей проблемой. По-видимому, существует скрытая сложность, onErrorContinue которая может привести к неожиданному поведению.