#java #kotlin #rx-java #rx-java2
#java #котлин #rx-java #rx-java2
Вопрос:
У меня есть случай, когда есть объект, который публикует объект в случайное время, я хочу собирать его в секунду в буфер и фильтровать по некоторой стратегии, такой как максимальный балл, чтобы убедиться, что в буфере только один объект в секунду.
subject
.buffer(1L, TimeUnit.SECONDS)
.filter {
isNotEmpty
}
.doOnNext {
// I get all object in the one second
// That waste too much memory, the non-max object shouldn't be put into the buffer
_.asScala.max(byScore)
}
.ignoreElements
.subscribeOn(Schedulers.io)
.subscribe
Этот код будет содержать весь объект за одну секунду и вернется ко мне.
Это не то, чего я хочу.
Есть ли какое-либо решение?
Ответ №1:
Вы можете использовать следующую версию buffer
оператора:
.buffer(long timespan, TimeUnit unit, Scheduler scheduler, int count,
Callable<U> bufferSupplier,
boolean restartTimerOnMaxSize)
Это позволяет вам определить вашу пользовательскую bufferSupplier
коллекцию, используемую для хранения буферизованных значений. Затем вы можете создать свою пользовательскую версию коллекции, в которой вы храните максимум один элемент и, в нашем случае, заменяете существующее значение, если появляется новое, большее:
class SingleItemMaxCollection : ArrayList<Long>() {
override fun add(element: Long): Boolean {
return when {
size == 1 amp;amp; get(0) < element -> { super.set(0, element); true }
size == 0 -> { super.add(element); true }
else -> false
}
}
}
Демонстрация, как вы можете использовать его для некоторых поддельных данных (элементы, передаваемые каждые 400 мс):
class SO65020891 {
private fun dataProvider() = Observable.just(1L, 2L, 3L, 4L, 5L, 6L)
.concatMap { Observable.just(it).delay(400, TimeUnit.MILLISECONDS) }
private fun getCollection(): () -> SingleItemMaxCollection = { SingleItemMaxCollection() }
fun getBufferedMax(): Observable<Long> {
return dataProvider()
.buffer(1, TimeUnit.SECONDS, Schedulers.computation(), 2, getCollection(), false)
.filter { it.isNotEmpty() }
.map { it[0] }
}
}
И, наконец, некоторая проверка:
class SO65020891Test {
@Test
fun maxEmittedValuesReturnedWithinWindows() {
val tested = SO65020891()
val values = tested.getBufferedMax().blockingIterable().toList()
assertEquals(listOf(2L, 4L, 6L), values)
}
}