#rx-java2 #rx-kotlin2
#rx-java2 #rx-kotlin2
Вопрос:
У меня есть цепочка, в которой я выполняю некоторые блокирующие вызовы ввода-вывода (например, HTTP-вызов). Я хочу, чтобы блокирующий вызов потреблял значение, продолжался без прерывания, но отбрасывал все, что накапливается тем временем, а затем таким же образом использовал следующее значение.
Рассмотрим следующий пример:
fun main() {
Flowable.interval(100, TimeUnit.MILLISECONDS).onBackpressureLatest().map {
Thread.sleep(1000)
it
}.blockingForEach { println(it) }
}
С наивной точки зрения, я бы ожидал, что он напечатает что-то вроде 0, 10, 20, ...
, но он печатает 0, 1, 2, ...
.
Что я делаю не так?
Редактировать:
Я подумал о наивном добавлении debounce
, чтобы съесть входящий поток:
fun main() {
Flowable.interval(100, TimeUnit.MILLISECONDS)
.debounce(0, TimeUnit.MILLISECONDS)
.map {
Thread.sleep(1000)
it
}
.blockingForEach { println(it) }
}
Но теперь я получаю java.lang.InterruptedException: sleep interrupted
.
Редактировать:
Похоже, работает следующее:
fun main() {
Flowable.interval(100, TimeUnit.MILLISECONDS)
.throttleLast(0, TimeUnit.MILLISECONDS)
.map {
Thread.sleep(1000)
it
}
.blockingForEach { println(it) }
}
Результат, как и ожидалось 0, 10, 20, ...
!!
Это правильный путь?
Я отметил, что throttleLast
переключится на планировщик вычислений. Есть ли способ вернуться к исходному планировщику?
Редактировать:
Я также иногда java.lang.InterruptedException: sleep interrupted
сталкиваюсь с этим вариантом.
Комментарии:
1. Регулирование иногда может отменить поток, который он излучает, когда таймер заканчивается до того, как этот поток завершит обработку предыдущего регулируемого элемента. Добавьте
observeOn
передmap
, чтобы переместить обработку из потока регулятора.2. Итак, выбрасывание случайных исключений — это предполагаемое поведение?
3. Избегайте блокировки в стандартных операторах, и все будет в порядке.
4. Некоторый сторонний код, который использует блокировки / мьютексы, приводит к тому же поведению.
Ответ №1:
Самый простой подход к решению проблемы:
fun <T> Flowable<T>.lossy() : Flowable<T> {
return onBackpressureLatest().observeOn(Schedulers.io(), false, 1)
}
При вызове lossy
a Flowable
он начинает отбрасывать все элементы, которые поступают быстрее, чем может обработать нижестоящий потребитель.