Как кэшировать / воспроизводить результат промежуточных шагов с помощью RxJava Observable, имея возможность отменить и возобновить в середине?

#rx-java #reactive-programming

#rx-java #реактивное программирование

Вопрос:

Моя цель состоит в том, чтобы выполнить несколько шагов действия, например:

  1. Получить токен.
  2. Используйте токен для инициализации API.
  3. Как только API будет инициализирован, начните наблюдение за данными.

Сложность здесь в том, что у меня также есть a mStateChangeObservable , которые излучают сигналы включения и выключения (например, Wi-Fi включен или выключен), и я должен иметь возможность отменить последовательность действий в середине, но каждый раз, когда сигнал «включен», я не должен перезапускаться с самого начала. В настоящее время я смог добиться этого, используя приведенный ниже код, но это просто сложно поддерживать. Я ищу более элегантный способ добиться этого с помощью операторов Rx.

Все, что имеет префикс, m является переменной-членом экземпляра класса.

 mStateChangeObservable = new StateChange().getObservable().replay(1).refCount();

Observable<String> dataObservable =
    Single.<Api>create(
            emitter -> {
              if (mToken == null) {
                emitter.setDisposable(
                    getToken()
                        .subscribe(
                            token -> {
                              mToken = token;

                              emitter.setDisposable(
                                  getApi()
                                      .subscribe(
                                          api -> {
                                            mApi = api;
                                            emitter.onSuccess(mApi);
                                          }));
                            }));
              } else if (mApi == null) {
                emitter.setDisposable(
                    getApi()
                        .subscribe(
                            api -> {
                              mApi = api;
                              emitter.onSuccess(mApi);
                            }));
              } else {
                emitter.onSuccess(mApi);
              }
            })
        .flatMapObservable(Api::getDataObservable);

mDataDisposable =
    mStateChangeObservable
        .switchMap(
            isReady -> {
              Log.d(TAG, "#1 Is ready = "   isReady);

              if (isReady) {
                return dataObservable;
              } else {
                return Observable.never();
              }
            })
        .subscribe(data -> Log.d(TAG, "api data = "   data));
 

Ответ №1:

Я нашел немного более элегантный способ, но все же предпочел бы, чтобы у кого-то был способ получше:

 mStateChangeObservable = new StateChange().getObservable().replay(1).refCount();

Observable<String> dataObservable =
    Single.defer(
            () -> {
              if (mToken == null) {
                return getToken().doOnSuccess(token -> mToken = token);
              } else {
                return Single.just(mToken);
              }
            })
        .flatMap(
            token -> {
              if (mApi == null) {
                return getApi().doOnSuccess(api -> mApi = api);
              } else {
                return Single.just(mApi);
              }
            })
        .flatMapObservable(Api::getDataObservable);

mDataDisposable =
    mStateChangeObservable
        .switchMap(
            isReady -> {
              Log.d(TAG, "#1 Is ready = "   isReady);

              if (isReady) {
                return dataObservable;
              } else {
                return Observable.never();
              }
            })
        .subscribe(data -> Log.d(TAG, "api data = "   data));