Project Reactor: кэшировать последний элемент для каждого подписанного издателя

#project-reactor

#проект-реактор

Вопрос:

У меня есть процессор, который подписывается на издателей, которые прибывают в произвольное время. Для каждого нового подписчика на процессор я хочу передавать последний элемент от каждого издателя.

     class PublishersState {
        val outputProcessor = DirectProcessor.create<String>()

        fun addNewPublisher(publisher: Flux<String>) {
            publisher.subscribe(outputProcessor)
        }

        fun getAllPublishersState(): Flux<String> = outputProcessor
    }

    val publisher1 = Mono
        .just("Item 1 publisher1")
        .mergeWith(Flux.never())

    val publisher2 = Flux
        .just("Item 1 publisher2", "Item 2 publisher2")
        .mergeWith(Flux.never())

    val publishersState = PublishersState()

    publishersState.getAllPublishersState().subscribe {
        println("Subscriber1: $it")
    }

    publishersState.addNewPublisher(publisher1)

    publishersState.addNewPublisher(publisher2)

    publishersState.getAllPublishersState().subscribe {
        println("Subscriber2: $it")
    }
  

Мне нужно изменить приведенный выше код, чтобы он выводил следующее:

 Subscriber1: Item 1 publisher1
Subscriber1: Item 1 publisher2
Subscriber1: Item 2 publisher2
// Subscriber2 subscribers here and receives the last item from each publisher
Subscriber2: Item 1 publisher1
Subscriber2: Item 2 publisher2
  

Есть ли простой способ кэшировать последний элемент для каждого издателя?

Ответ №1:

Используйте ReplayProcessor вместо DirectProcessor :

 val outputProcessor = ReplayProcessor.cacheLast()
  

Комментарии:

1. ReplayProcessor воспроизводит последнее значение для позднего подписчика, в то время как мне нужно последнее значение от каждого издателя, на которого подписан процессор.

Ответ №2:

Я решил свой случай следующим образом:

 class PublishersState {
  val publishersList = Collections.synchronizedList<Flux<String>>(mutableListOf()) // adding sync list for storing publishers 
  val outputProcessor = DirectProcessor.create<String>()

  fun addNewPublisher(publisher: Flux<String>) {
    val cached = publisher.cache(1) // caching the last item for a new publisher
    publishersList.add(cached)
    cached.subscribe(outputProcessor)
  }

  fun getAllPublishersState(): Flux<String> = publishersList
    .toFlux()
    .reduce(outputProcessor as Flux<String>) { acc, flux -> acc.mergeWith(flux.take(1)) } // merging the last item of each publisher with outputProcessor 
    .flatMapMany { it }
}