Как я могу продолжить цепочку Rx разными способами в зависимости от свойства?

#android #rx-java #rx-android

#Android #rx-java #rx-android

Вопрос:

У меня есть метод, в котором на основе значения текучих данных мне нужно продолжить цепочку RX разными способами. я имею в виду, что если свойство IsOnline объекта данных равно true, то мне нужно вызвать scan(initial,selector), но если оно равно false, то мне нужно вызвать scan(selector)

 
 @NotNull
    public Flowable<Data> initialCall(
            @NotNull Flowable<Info> info, Data initial) {
        return  info
                .map()
                .switchMap(it -> call(Flowable.just(it),initial, it.isOnline));
    }



   private Flowable<Data> call (
            Flowable<A> a, 
            Data initial, boolean isOnline
    ) {
        return Flowable.combineLatest(
                a,
                b,
                (a, b) -> {
                    return ....;
                })
                .switchMap()
    ///here based on the Data isOnline property I need to call either 
    ///scan(initial, selector) or scan(selector) and then continue.... 
                .map()
                .distinctUntilChanged()
                .toObservable()
                .compose()
                .compose()
                .toFlowable(BUFFER)
    }
 

Комментарии:

1. Должны ли оба scan возвращать один и тот же возвращаемый тип?

2. Да, тот же возвращаемый тип

3. Не могли бы вы привести пример, как выглядит подпись обоих сканирований? switchMap возвращает «Данные» и сканирует с помощью селекторных отображений из (данные, данные) -> данные, но ваше начальное значение имеет тип C. Это не сработает, потому что scan(c, (prev, curr) -> {возвращает тип c }) . Это возможно только тогда, когда C имеет тип Data . Не могли бы вы уточнить?

4. Извините, я просто попытался упростить код и допустил ошибки. да, начальный тип данных

5. Я также хочу упомянуть, что свойство, основанное на нем, должно сканировать (начальное, селекторное) или сканировать (), должно быть, у нас может быть это свойство, когда мы также вызываем метод вызова.

Ответ №1:

Это то, что вы хотите? На основе isOnline a scan -operator применяется со seed значением или без него.

 import io.reactivex.rxjava3.core.BackpressureStrategy;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.FlowableTransformer;
import org.junit.jupiter.api.Test;

class So65349760 {
  private static <C extends Data> FlowableTransformer<Data, Data> scan(
      Boolean isOnline, C initialValue) {
    if (isOnline) {
      return upstream -> {
        return upstream.scan(
            initialValue,
            (prev, current) -> {
              return new Data();
            });
      };
    } else {
      return upstream -> upstream.scan((prev, current) -> new Data());
    }
  }

  @Test
  void so65349760() {
    Flowable<Integer> a = Flowable.fromCallable(() -> 1);
    Flowable<String> b = Flowable.fromCallable(() -> "42");

    Data seed = new Data();

    call(a, b, seed, false).test().assertValueCount(1);
    call(a, b, seed, true).test().assertValueCount(2);
  }

  private <A, B, C extends Data> Flowable<Data> call(
      Flowable<A> a, Flowable<B> b, C init, boolean isOnline) {
    return Flowable.combineLatest(a, b, (v1, v2) -> 42)
        .switchMap(integer -> Flowable.just(new Data()))
        .compose(scan(isOnline, init))
        .map(d -> d)
        .distinctUntilChanged()
        .toObservable()
        .toFlowable(BackpressureStrategy.BUFFER);
  }

  private static class Data {}
}
 

Комментарии:

1. Спасибо @Hans Wurst, насколько я понимаю, scan — это метод, который вы определили, но я. не уверен, откуда берется восходящий поток? Я понимаю, что это поток, который мы имеем после switchmap, но не понимаю, как у нас есть доступ при сканировании?

2. Восходящий поток обеспечивается compose . Просто посмотрите на определение интерфейса FlowableTransformer<@NonNull Upstream, @NonNull Downstream> { Publisher<Downstream> apply(@NonNull Flowable<Upstream> upstream) } . scan Метод реализует трансформатор через лямбда-выражение. Вы также можете использовать реализацию антонимичного класса. Flowable#compose дает вам свободу извлекать операторы в другое место и применять их к любому соответствию Flowable . Дальнейшее чтение: blog.danlew.net/2015/03/02/dont-break-the-chain

3. Большое спасибо, вы действительно искусны в Rx.

4. У меня есть еще один вопрос, который не входит в эту область вопроса, но в статье, которой вы поделились, говорится о flatmap против compose , но flatmap в качестве входных данных принимает простые данные, но compose преобразует поток данных, да? причину, по которой я заинтересован, вы можете увидеть в коде, которым я поделился в initialCall У меня есть switchmap, поскольку flatmap не поддерживает порядок, я использовал switchmap, мне интересно, можно ли поддерживать порядок с помощью compose???? Я знаю о concatmap и concatmapeager, но их не выгодно использовать.

5. извините за длинное сообщение