как обрабатывать ошибки RxAndroid в основном потоке

#error-handling #rx-java #rx-android

#обработка ошибок #rx-java #rx-android

Вопрос:

Я новичок в RxJava / Android и удивлен, что мой onError лямбда-код иногда вызывается в главном потоке, а иногда и нет, хотя я использую .observeOn(AndroidSchedulers.mainThread())

Пример 1: onError в основном потоке
это работает так, как ожидалось: onError вызывается в основном потоке

 Observable.error(new RuntimeException("RTE"))
    .subscribeOn(Schedulers.newThread())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(s -> {
                Log.e(TAG, "onNext(" s ")-thread: "   Thread.currentThread().getName());
            },
            throwable -> {
                Log.e(TAG, "onError()-thread: "   Thread.currentThread().getName());
            });
  

лог-вывод:

 onError()-thread: main
  

Пример 2: onError НЕ в основном потоке

 Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        subscriber.onNext("one and only");
    }
})
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.timeout(1, TimeUnit.SECONDS)
.subscribe(s -> {
        Log.e(TAG, "onNext(" s ")-thread: "   Thread.currentThread().getName());
    },
    throwable -> {
        Log.e(TAG, "onError()-thread: "   Thread.currentThread().getName());
    });
  

результат выглядит примерно так:

 onNext(one and only)-thread: main
onError()-thread: RxComputationScheduler-4       
  

Я думал, что после вызова observeOn(AndroidSchedulers.MainThread()) ВСЕ эмиссии должны выполняться в основном потоке.

итак, у меня есть эти вопросы:

  1. Я не смог найти никакой документации, в которой указывалось бы, при каких обстоятельствах onError вызывается какой поток. Кто-нибудь знает ссылку?
  2. Я, конечно, хочу отобразить некоторую индикацию ошибок в графическом интерфейсе: итак, как я могу заставить onError ВСЕГДА вызываться в основном потоке?

Ответ №1:

Я только что узнал, что мне нужно только изменить порядок вызовов. Когда я звоню observeOn после timeout того, как он работает, как ожидалось:

 .timeout(1, TimeUnit.SECONDS)
.observeOn(AndroidSchedulers.mainThread())
  

лог-вывод

 onNext(one and only)-thread: main
onError()-thread: main
  

Причина в том, что observeOn это повлияет только на все, что находится ниже вызова, и только до тех пор, пока какой-нибудь другой оператор снова не изменит поток. В приведенном выше примере timeout() изменится на поток вычислений.

Обратите внимание, что subscribeOn это работает по-другому. Не имеет значения, где в цепочке вы это вызываете.
Вы должны вызывать его только один раз (когда вы вызываете его несколько раз, первый вызов выигрывает: см. «Несколько подписок» в этом блоге)

Вот хороший пост в блоге с более подробной информацией: RxJava- понимание observeOn () и subscribeOn ()

Ответ №2:

Потому .timeout(1, TimeUnit.SECONDS) что по умолчанию работает Schedulers.computation() .

Комментарии:

1. но observeOn() должен переопределять это значение по умолчанию, верно?

2. .тайм-аут (1, TimeUnit. СЕКУНД, [желаемый планировщик])