#kotlin #reactive-programming #spring-webflux #project-reactor
#kotlin #реактивное программирование #spring-webflux #проект-реактор
Вопрос:
У меня есть поток строк, которые должны быть преобразованы в поток dto. Синтаксический анализ может быть завершен с ошибкой, и по бизнес-правилам мне просто нужно пропускать такие записи
Если я использую null «Котлина» — я получаю NPE, потому что по дизайну reactor не принимает нули в .map
fun toDtoFlux(source:Flux<String>):Flux<Dto>{
source.map(Parser::parse)
.filter(it!=null)
}
object Parser{
fun parse(line:String):Dto?{
..
}
}
Я могу использовать Optional. Но это не способ Kotlin.
fun toDtoFlux(source:Flux<String>):Flux<Dto>{
source.map(Parser::parse)
.filter(Optional.isPresent)
.map(Optional::get)
}
object Parser{
fun parse(line:String):Optional<Dto>{
..
}
}
Какой наиболее идиоматический способ обработки таких случаев в Kotlin?
Ответ №1:
Вы можете создать функцию расширения:
fun <T, U> Flux<T>.mapNotNull(mapper: (T) -> U?): Flux<U> =
this.flatMap { Mono.justOrEmpty(mapper(it)) }
Тогда вы можете использовать его следующим образом:
fun main() {
Flux.just("a", "b", "c")
.mapNotNull { someNullableMapFunction(it) }
.doOnNext { println(it) } // prints "a" and "c"
.blockLast()
}
fun someNullableMapFunction(it: String): String? {
if (it == "b") {
return null
}
return it
}
Обновить
На основе комментария Саймона реализация функции расширения может быть более идиоматичной (и производительной?) В Reactor таким образом:
fun <T, U> Flux<T>.mapNotNull(mapper: (T) -> U?): Flux<U> =
this.handle { item, sink -> mapper(item)?.let { sink.next(it) } }
Комментарии:
1. в качестве альтернативы, рассмотрим
handle
вместоflatMap
дляmapNotNull
реализации
Ответ №2:
Решения, которые я вижу :
Использование API Reactor
Я бы посоветовал вам использовать Reactor API для решения такого случая и заставить ваш анализатор возвращать Mono. Пустой Mono означает отсутствие результата. При этом вы можете использовать flatMap вместо цепочки map / filter / map.
Это может показаться немного излишним, но это позволит любой реализации синтаксического анализатора выполнять асинхронные действия в будущем, если это необходимо (получение информации из стороннего сервиса, ожидание проверки от пользователя и т. Д.).
И он также предоставляет мощный API для управления ошибками синтаксического анализа, поскольку вы можете определять политики возврата / пользовательских ошибок для результата синтаксического анализа.
Это изменило бы ваш пример таким образом :
fun interface Parser {
fun parse(record: String): Mono<Dto>;
}
fun Parser.toDtoFlux(source:Flux<String>): Flux<Dto> {
source.flatMap(this::parse)
}
Использование закрытого класса
Kotlin предлагает другие способы управления параметрами результата, вдохновленные функциональным программированием. Один из способов — использовать закрытые классы для определения набора общих случаев для обработки при синтаксическом анализе. Это позволяет моделировать богатые результаты, предоставляя пользователям синтаксического анализатора несколько вариантов обработки ошибок.
sealed class ParseResult
class Success(val value: Dto) : ParseResult
class Failure(val reason : Exception) : ParseResult
object EmptyRecord : ParseResult
fun interface Parser {
fun parse(raw: String) : ParseResult
}
fun Parser.toDtoFlux(source:Flux<String>): Flux<Dto> {
return source.map(this::parse)
.flatMap { when (it) {
is Success -> Mono.just(it.value)
is Failure -> Mono.error(it.reason) // Or Mono.empty if you don't care
is EmptyRecord -> Mono.empty()
}}
}