Одноэлементный буфер Rxjava

#java #kotlin #rx-java #rx-java2

#java #котлин #rx-java #rx-java2

Вопрос:

У меня есть случай, когда есть объект, который публикует объект в случайное время, я хочу собирать его в секунду в буфер и фильтровать по некоторой стратегии, такой как максимальный балл, чтобы убедиться, что в буфере только один объект в секунду.

 subject
    .buffer(1L, TimeUnit.SECONDS)
    .filter {
        isNotEmpty
    }
    .doOnNext {
        // I get all object in the one second
        // That waste too much memory, the non-max object shouldn't be put into the buffer
        _.asScala.max(byScore)
    }
    .ignoreElements
    .subscribeOn(Schedulers.io)
    .subscribe
 

Этот код будет содержать весь объект за одну секунду и вернется ко мне.

Это не то, чего я хочу.

Есть ли какое-либо решение?

Ответ №1:

Вы можете использовать следующую версию buffer оператора:

 .buffer(long timespan, TimeUnit unit, Scheduler scheduler, int count,
       Callable<U> bufferSupplier,
       boolean restartTimerOnMaxSize)
 

Это позволяет вам определить вашу пользовательскую bufferSupplier коллекцию, используемую для хранения буферизованных значений. Затем вы можете создать свою пользовательскую версию коллекции, в которой вы храните максимум один элемент и, в нашем случае, заменяете существующее значение, если появляется новое, большее:

 class SingleItemMaxCollection : ArrayList<Long>() {

    override fun add(element: Long): Boolean {
        return when {
            size == 1 amp;amp; get(0) < element -> { super.set(0, element); true }
            size == 0 -> { super.add(element); true }
            else -> false
        }
    }
}
 

Демонстрация, как вы можете использовать его для некоторых поддельных данных (элементы, передаваемые каждые 400 мс):

 class SO65020891 {

    private fun dataProvider() = Observable.just(1L, 2L, 3L, 4L, 5L, 6L)
        .concatMap { Observable.just(it).delay(400, TimeUnit.MILLISECONDS) }

    private fun getCollection(): () -> SingleItemMaxCollection = { SingleItemMaxCollection() }

    fun getBufferedMax(): Observable<Long> {
        return dataProvider()
            .buffer(1, TimeUnit.SECONDS, Schedulers.computation(), 2, getCollection(), false)
            .filter { it.isNotEmpty() }
            .map { it[0] }
    }
}
 

И, наконец, некоторая проверка:

 class SO65020891Test {

    @Test
    fun maxEmittedValuesReturnedWithinWindows() {
        val tested = SO65020891()

        val values = tested.getBufferedMax().blockingIterable().toList()

        assertEquals(listOf(2L, 4L, 6L), values)
    }
}