Как правильно применить противодавление к объекту публикации?

#kotlin #rx-java #reactivex

Вопрос:

У меня есть PublishSubject приложение, которое выдает обновления местоположения ( LatLng ):

 val location = PublishSubject.create<LatLng>()
 

Теперь я хочу сделать что-то с этим местоположением, что может занять некоторое время и должно быть сделано последовательно:

 location
    .observeOn(Schedulers.computation())
    .subscribeOn(Schedulers.io())
    .subscribe {
        // ... CPU-heavy operation
    }
 

Только после завершения каждой операции в subscribe программе может быть запущена следующая. Более того, каждое обновление делает предыдущее устаревшим, поэтому меня интересует только последнее значение. Таким образом, подписчик должен получать только последнее значение на данный момент при каждом обновлении. Поэтому я подумал о применении противодавления:

 location
    .subscribeOn(Schedulers.io())
    .observeOn(Schedulers.computation())
    .toFlowable(BackpressureStrategy.LATEST)
    .subscribeWith(object : DisposableSubscriber<LatLng>() {
        override fun onStart() {
            request(1)
        }

        override fun onError(t: Throwable) {
            // ...
        }

        override fun onComplete() {
            // ...
        }

        override fun onNext(t: LatLng) {
            // ... CPU-heavy task
            request(1)
        }
    })
 

К сожалению, это не работает, так как каждое LatLng исходящее сообщение доставляется подписчику, и никакие значения не пропускаются, как и должно быть.

Ответ №1:

Проблема в том, что observeOn всегда можно буферизировать хотя бы один элемент. Вы можете достичь желаемого эффекта delay , хотя:

 location
    .toFlowable(BackpressureStrategy.LATEST)
    .delay(0, TimeUnit.SECONDS, Schedulers.computation())
    .subscribeWith(object : DisposableSubscriber<LatLng>() {
        override fun onStart() {
            request(1)
        }

        override fun onError(t: Throwable) {
            // ...
        }

        override fun onComplete() {
            // ...
        }

        override fun onNext(t: LatLng) {
            // ... CPU-heavy task
            request(1)
        }
    })
 

Дополнительные замечания:

  1. subscribeOn не имеет практического эффекта с a Subject и может быть проигнорирован
  2. Лучше toFlowable всего применять вблизи источника, чтобы избежать ненужного затопления delay предметами.
  3. Если вы не возражаете против сторонних операторов, есть оператор observeOnLatest, который может сделать то же самое.