#kotlin #rx-java #reactivex
Вопрос:
У меня есть PublishSubject
приложение, которое выдает обновления местоположения ( LatLng
):
val location = PublishSubject.create<LatLng>()
Теперь я хочу сделать что-то с этим местоположением, что может занять некоторое время и должно быть сделано последовательно:
location
.observeOn(Schedulers.computation())
.subscribeOn(Schedulers.io())
.subscribe {
// ... CPU-heavy operation
}
Только после завершения каждой операции в subscribe
программе может быть запущена следующая. Более того, каждое обновление делает предыдущее устаревшим, поэтому меня интересует только последнее значение. Таким образом, подписчик должен получать только последнее значение на данный момент при каждом обновлении. Поэтому я подумал о применении противодавления:
location
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.toFlowable(BackpressureStrategy.LATEST)
.subscribeWith(object : DisposableSubscriber<LatLng>() {
override fun onStart() {
request(1)
}
override fun onError(t: Throwable) {
// ...
}
override fun onComplete() {
// ...
}
override fun onNext(t: LatLng) {
// ... CPU-heavy task
request(1)
}
})
К сожалению, это не работает, так как каждое LatLng
исходящее сообщение доставляется подписчику, и никакие значения не пропускаются, как и должно быть.
Ответ №1:
Проблема в том, что observeOn
всегда можно буферизировать хотя бы один элемент. Вы можете достичь желаемого эффекта delay
, хотя:
location
.toFlowable(BackpressureStrategy.LATEST)
.delay(0, TimeUnit.SECONDS, Schedulers.computation())
.subscribeWith(object : DisposableSubscriber<LatLng>() {
override fun onStart() {
request(1)
}
override fun onError(t: Throwable) {
// ...
}
override fun onComplete() {
// ...
}
override fun onNext(t: LatLng) {
// ... CPU-heavy task
request(1)
}
})
Дополнительные замечания:
subscribeOn
не имеет практического эффекта с aSubject
и может быть проигнорирован- Лучше
toFlowable
всего применять вблизи источника, чтобы избежать ненужного затопленияdelay
предметами. - Если вы не возражаете против сторонних операторов, есть оператор observeOnLatest, который может сделать то же самое.