#rx-java
#rx-java
Вопрос:
Я пробовал ниже в RxJva 2.0.0-RC3.
Flowable.interval(10, TimeUnit.MILLISECONDS)
.doOnNext(System.out::println)
.observeOn(Schedulers.computation(), false, 10)
.subscribe(new ResourceSubscriber<Long>() {
@Override
protected void onStart() {
request(1);
}
@Override
public void onNext(Long t) {
try {
Thread.sleep(30);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("received: " t);
request(1L);
}
...
});
Затем я получил следующий результат.
0
1
2
3
received: 0
4
5
6
received: 1
7
8
9
received: 2
received: 3
io.reactivex.exceptions.MissingBackpressureException: Can't deliver value 10 due to lack of requests
...
Я ожидал получить исключение MissingBackpressureException после того, как поток выдаст около 13, потому что 0, 1, 2 и 3 уже были отправлены, поэтому количество буферизованных элементов может быть 6, когда поток выдал 10. Похоже, что метод запроса не повлияет на количество буферизованных элементов.
Я попробовал аналогичный код в RxJava 1.2.0 и получил другой результат.
...
20
21
received: 6
22
23
24
received: 7
25
rx.exceptions.MissingBackpressureException
...
Результат версии 1.2.0 также сбивает с толку, поскольку при возникновении исключения количество буферизованных элементов может составлять 18 (от 8 до 25), а количество буферизованных элементов будет больше 10.
Это подходящее поведение? Я думал, что исключение MissingBackpressureException будет вызвано, когда количество буферизованных элементов станет больше размера буфера.
Причина, по которой я хочу исключение MissingBackpressureException, заключается в том, что я хочу остановить поток или наблюдаемый, как только количество ожидающих элементов превысит размер буфера.
Итак, я могу быстро попросить пользователей подождать некоторое время, когда система будет занята.
Ответ №1:
В 1.x interval
игнорируется противодавление. В обеих версиях предварительная выборка из 10 создаст внутренний буфер из 16 элементов observeOn
, но 1.x interval
с радостью переполнит этот буфер. Кроме того, observeOn
запрашивает больше от своего источника после доставки 75% от суммы предварительной выборки (~ 7). В 1.x это не имеет значения, и по мере перемещения обоих концов вы получаете MBE примерно через 24 элемента. В 2.x доставляется первоначальный запрос из 10, но поскольку за это время потребляется только 3, observeOn
повторный запрос не выполняется, и вы получаете немедленный MBE.
Комментарии:
1. Спасибо за информацию! Теперь я понимаю механизм. Однако мне интересно, почему механизм кэширования будет таким. Это из-за производительности?