Использование `onBackpressureLatest` для удаления промежуточных сообщений в блокирующем потоке

#rx-java2 #rx-kotlin2

#rx-java2 #rx-kotlin2

Вопрос:

У меня есть цепочка, в которой я выполняю некоторые блокирующие вызовы ввода-вывода (например, HTTP-вызов). Я хочу, чтобы блокирующий вызов потреблял значение, продолжался без прерывания, но отбрасывал все, что накапливается тем временем, а затем таким же образом использовал следующее значение.

Рассмотрим следующий пример:

 fun main() {
  Flowable.interval(100, TimeUnit.MILLISECONDS).onBackpressureLatest().map {
    Thread.sleep(1000)
    it
  }.blockingForEach { println(it) }
}
  

С наивной точки зрения, я бы ожидал, что он напечатает что-то вроде 0, 10, 20, ... , но он печатает 0, 1, 2, ... .

Что я делаю не так?

Редактировать:

Я подумал о наивном добавлении debounce , чтобы съесть входящий поток:

 fun main() {
  Flowable.interval(100, TimeUnit.MILLISECONDS)
    .debounce(0, TimeUnit.MILLISECONDS)
    .map {
      Thread.sleep(1000)
      it
    }
    .blockingForEach { println(it) }
}
  

Но теперь я получаю java.lang.InterruptedException: sleep interrupted .

Редактировать:

Похоже, работает следующее:

 fun main() {
  Flowable.interval(100, TimeUnit.MILLISECONDS)
    .throttleLast(0, TimeUnit.MILLISECONDS)
    .map {
      Thread.sleep(1000)
      it
    }
    .blockingForEach { println(it) }
}
  

Результат, как и ожидалось 0, 10, 20, ... !!

Это правильный путь?

Я отметил, что throttleLast переключится на планировщик вычислений. Есть ли способ вернуться к исходному планировщику?

Редактировать:

Я также иногда java.lang.InterruptedException: sleep interrupted сталкиваюсь с этим вариантом.

Комментарии:

1. Регулирование иногда может отменить поток, который он излучает, когда таймер заканчивается до того, как этот поток завершит обработку предыдущего регулируемого элемента. Добавьте observeOn перед map , чтобы переместить обработку из потока регулятора.

2. Итак, выбрасывание случайных исключений — это предполагаемое поведение?

3. Избегайте блокировки в стандартных операторах, и все будет в порядке.

4. Некоторый сторонний код, который использует блокировки / мьютексы, приводит к тому же поведению.

Ответ №1:

Самый простой подход к решению проблемы:

 fun <T> Flowable<T>.lossy() : Flowable<T> {
  return onBackpressureLatest().observeOn(Schedulers.io(), false, 1)
}
  

При вызове lossy a Flowable он начинает отбрасывать все элементы, которые поступают быстрее, чем может обработать нижестоящий потребитель.