Влияет ли метод запроса на количество буферизованных элементов?

#rx-java

#rx-java

Вопрос:

Я пробовал ниже в RxJva 2.0.0-RC3.

 Flowable.interval(10, TimeUnit.MILLISECONDS)
    .doOnNext(System.out::println)
    .observeOn(Schedulers.computation(), false, 10)
    .subscribe(new ResourceSubscriber<Long>() {

      @Override
      protected void onStart() {
        request(1);
      }

      @Override
      public void onNext(Long t) {
        try {
          Thread.sleep(30);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
        System.out.println("received: "   t);
        request(1L);
      }
      ...
    });
  

Затем я получил следующий результат.

 0
1
2
3
received: 0
4
5
6
received: 1
7
8
9
received: 2
received: 3
io.reactivex.exceptions.MissingBackpressureException: Can't deliver value 10 due to lack of requests
...
  

Я ожидал получить исключение MissingBackpressureException после того, как поток выдаст около 13, потому что 0, 1, 2 и 3 уже были отправлены, поэтому количество буферизованных элементов может быть 6, когда поток выдал 10. Похоже, что метод запроса не повлияет на количество буферизованных элементов.

Я попробовал аналогичный код в RxJava 1.2.0 и получил другой результат.

 ...
20
21
received: 6
22
23
24
received: 7
25
rx.exceptions.MissingBackpressureException
...
  

Результат версии 1.2.0 также сбивает с толку, поскольку при возникновении исключения количество буферизованных элементов может составлять 18 (от 8 до 25), а количество буферизованных элементов будет больше 10.

Это подходящее поведение? Я думал, что исключение MissingBackpressureException будет вызвано, когда количество буферизованных элементов станет больше размера буфера.

Причина, по которой я хочу исключение MissingBackpressureException, заключается в том, что я хочу остановить поток или наблюдаемый, как только количество ожидающих элементов превысит размер буфера.

Итак, я могу быстро попросить пользователей подождать некоторое время, когда система будет занята.

Ответ №1:

В 1.x interval игнорируется противодавление. В обеих версиях предварительная выборка из 10 создаст внутренний буфер из 16 элементов observeOn , но 1.x interval с радостью переполнит этот буфер. Кроме того, observeOn запрашивает больше от своего источника после доставки 75% от суммы предварительной выборки (~ 7). В 1.x это не имеет значения, и по мере перемещения обоих концов вы получаете MBE примерно через 24 элемента. В 2.x доставляется первоначальный запрос из 10, но поскольку за это время потребляется только 3, observeOn повторный запрос не выполняется, и вы получаете немедленный MBE.

Комментарии:

1. Спасибо за информацию! Теперь я понимаю механизм. Однако мне интересно, почему механизм кэширования будет таким. Это из-за производительности?