#project-reactor
#проект-реактор
Вопрос:
У меня есть процессор, который подписывается на издателей, которые прибывают в произвольное время. Для каждого нового подписчика на процессор я хочу передавать последний элемент от каждого издателя.
class PublishersState {
val outputProcessor = DirectProcessor.create<String>()
fun addNewPublisher(publisher: Flux<String>) {
publisher.subscribe(outputProcessor)
}
fun getAllPublishersState(): Flux<String> = outputProcessor
}
val publisher1 = Mono
.just("Item 1 publisher1")
.mergeWith(Flux.never())
val publisher2 = Flux
.just("Item 1 publisher2", "Item 2 publisher2")
.mergeWith(Flux.never())
val publishersState = PublishersState()
publishersState.getAllPublishersState().subscribe {
println("Subscriber1: $it")
}
publishersState.addNewPublisher(publisher1)
publishersState.addNewPublisher(publisher2)
publishersState.getAllPublishersState().subscribe {
println("Subscriber2: $it")
}
Мне нужно изменить приведенный выше код, чтобы он выводил следующее:
Subscriber1: Item 1 publisher1
Subscriber1: Item 1 publisher2
Subscriber1: Item 2 publisher2
// Subscriber2 subscribers here and receives the last item from each publisher
Subscriber2: Item 1 publisher1
Subscriber2: Item 2 publisher2
Есть ли простой способ кэшировать последний элемент для каждого издателя?
Ответ №1:
Используйте ReplayProcessor
вместо DirectProcessor
:
val outputProcessor = ReplayProcessor.cacheLast()
Комментарии:
1. ReplayProcessor воспроизводит последнее значение для позднего подписчика, в то время как мне нужно последнее значение от каждого издателя, на которого подписан процессор.
Ответ №2:
Я решил свой случай следующим образом:
class PublishersState {
val publishersList = Collections.synchronizedList<Flux<String>>(mutableListOf()) // adding sync list for storing publishers
val outputProcessor = DirectProcessor.create<String>()
fun addNewPublisher(publisher: Flux<String>) {
val cached = publisher.cache(1) // caching the last item for a new publisher
publishersList.add(cached)
cached.subscribe(outputProcessor)
}
fun getAllPublishersState(): Flux<String> = publishersList
.toFlux()
.reduce(outputProcessor as Flux<String>) { acc, flux -> acc.mergeWith(flux.take(1)) } // merging the last item of each publisher with outputProcessor
.flatMapMany { it }
}