Дождаться завершения Observable перед выполнением другого Observable?

#android #rx-java #rx-android

#Android #rx-java #rx-android

Вопрос:

Проблема

У меня есть действие, которое периодически извлекает данные из API и отображает полученные данные. API использует OAuth, поэтому я получаю временный токен доступа, срок действия которого истекает через определенный период времени (1 час). Если приложение попытается получить данные с истекшим токеном, очевидно, что запрос завершится ошибкой. На более ранней итерации моего приложения я использовал AsyncTasks для сетевых запросов и, по сути, просто выполнил новую AsyncTask, которая получила бы новый токен доступа перед вызовом основной AsyncTask, которая извлекает данные с сервера. Это отлично сработало, потому что основная AsyncTask будет ждать, пока другая не будет завершена перед выполнением.

Недавно я переключился на RxJava и в основном просто заменил AsyncTasks на Observables. Проблема в том, что основной Observable, который извлекает данные, не ожидает завершения Observable, который обновляет токен доступа. Вот мой код, спасибо за вашу помощь.

Код

LiveThreadActivity.java

 private Subscription subscription;
private Observable<List<CustomComment>> fetchData;

@Override
protected void onResume() {
    super.onResume();

    if (tokenExpired()) {
        auth.refreshToken();
    }

    subscription = fetchData
            .compose(bindToLifecycle())
            .retryWhen(new RetryWithDelay(5, 2000))
            .subscribe(list -> addNewComments(list), e -> handleFetchDataError(e));

}


// This method gets called in onCreate()
private void dataCollection() {
    fetchData = Observable.interval(0, REFRESH_RATE, TimeUnit.MILLISECONDS)
            .map(tick -> fetchNewComments())            // Run function every time a tick is emitted
            .retryWhen( new RetryWithDelay(2, 2000) )   // Retry twice with 2 second delay
            .subscribeOn(Schedulers.io())               // Network stuff in background thread
            .observeOn(AndroidSchedulers.mainThread()); // Other stuff on the main thread

}
  

Auth.java

 public class Auth {
    ...

    public void refreshToken() {
        Observable.just(1)
                .map(y -> refreshAccessToken())
                .retryWhen( new RetryWithDelay(3, 2000) )
                .subscribeOn(Schedulers.io())
                .subscribe();
    }
}
  

Ответ №1:

При использовании реактивных библиотек необходим новый способ мышления. Вы должны писать код синхронно, но имейте в виду, что он выполняется асинхронно.

Ваш код просто выполняется синхронно. Он выполняет два Observable одновременно.

Функция refreshToken() должна выглядеть следующим образом:

 public Observable<?> refreshToken() {
    return Observable.just(1)
            .map(y -> refreshAccessToken())
            .retryWhen( new RetryWithDelay(3, 2000) )
            .subscribeOn(Schedulers.io());
}
  

И onResume() :

 @Override
protected void onResume() {
    super.onResume();

    Observable obs = fetchData
            .compose(bindToLifecycle())
            .retryWhen(new RetryWithDelay(5, 2000));

    if (tokenExpired()) {
        obs = obs.startWith(auth.refreshToken());
    }

    subscription = obs
            .subscribe(list -> addNewComments(list), e -> handleFetchDataError(e));

}
  

Обратите внимание на startWith() оператор. Это позволяет выполнять одно Observable (выборка списка) за другим (обновление токена).

Ответ №2:

.flatMap (), вероятно, будет достаточным, т. Е. tokenObservable.flatMap(/* возвращает dataObservable */)