#kotlin #project-reactor
#kotlin #проект-реактор
Вопрос:
Я столкнулся со случаем, когда у меня есть вложенный поток. Меня не волнуют отдельные результаты внутреннего потока, поскольку он возвращает единицу (в Kotlin / Void в Java), но я хочу знать, прерван ли поток из-за ошибки или нет. Я думал, что смогу использовать then
функцию, как указано в документе: Error signal is replayed in the resulting Mono<V>
Моя проблема может быть сведена к минимальному модульному тестированию (Kotlin):
@Test
fun fluxTest() {
val flux = Flux.just("willFail", "willSucceed")
.flatMap { outer ->
// In my real world example the inner flux is created via Flux.fromIterable from a property of the
// outer`-object
Flux.just(1)
.flatMap { inner ->
// this simulates a Mono.fromSupplier that can throw exceptions
if (outer == "willFail") Mono.error<Unit>(RuntimeException("bam"))
else Mono.just(Unit)
}
// We don't care about the Flux as it returns Unit/Void
// All we want to know is, whether there was an error or not
.then(Mono.just(outer))
}
.onErrorContinue { error, item -> println("$item => $error") }
.collectList()
StepVerifier.create(flux)
.expectNextMatches { it.size == 1 }
.verifyComplete()
}
Итак, у нас есть 2 элемента. Во внутреннем потоке один из элементов завершится ошибкой при обработке, а другой — нет. Я ожидаю, что ошибка будет распространяться по конвейеру, где она будет обнаружена и отброшена в onErrorContinue
.
Поэтому я ожидаю, что в результирующем списке будет 1 элемент, но я получаю исходные 2. Я понятия не имею, почему.
Теперь начинается самое интересное: в этом конкретном тестовом примере я могу заменить Flux.just(1)
на Mono.just(1)
(в моем реальном случае это не работает ofc, потому что поток содержит более 1 элемента), и внезапно мой тест проходит:
@Test
fun fluxTest() {
val flux = Flux.just("willFail", "willSucceed")
.flatMap { outer ->
// In my real world example the inner flux is created via Flux.fromIterable from a property of the
// outer`-object
Mono.just(1)
.flatMap { inner ->
// this simulates a Mono.fromSupplier that can throw exceptions
if (outer == "willFail") Mono.error<Unit>(RuntimeException("bam"))
else Mono.just(Unit)
}
// We don't care about the Flux as it returns Unit/Void
// All we want to know is, whether there was an error or not
.then(Mono.just(outer))
}
.onErrorContinue { error, item -> println("$item => $error") }
.collectList()
StepVerifier.create(flux)
.expectNextMatches { it.size == 1 }
.verifyComplete()
}
Итак, очевидно, что есть разница в Mono.then(Mono<T>)
и в Flux.then(Mono<T>)
, но этого не должно быть, поскольку Javadoc одинаковый, верно?
Примечание сбоку: вместо Flux.then(Mono.just(outer))
я тоже пробовал Mono.defer
, но это ничего не меняет.
Комментарии:
1. Это странно. Просматривая проблемы проекта, я нашел эту, которая может быть связана с вашей проблемой. По-видимому, существует скрытая сложность,
onErrorContinue
которая может привести к неожиданному поведению.