Как использовать оператор RxJava combineLatest с более чем 9 наблюдаемыми

#android #kotlin #rx-kotlin

#java #rx-java

Вопрос:

Я использую RxJava и хочу объединить 12 различных наблюдаемых с помощью оператора combineLatest .

Я видел прототип функции, который принимает список наблюдаемых и реализацию FuncN но я не уверен, как это сделать, у меня возникли проблемы с реализацией call метода.

Может кто-нибудь показать мне пример?

Комментарии:

1. Пожалуйста, отформатируйте свой код и функции, используя ``

Ответ №1:

Существует, combineLatest который принимает List несколько наблюдаемых. Вот пример того, как его использовать:

 List<Observable<?>> list = Arrays.asList(Observable.just(1), Observable.just("2"));
Observable.combineLatest(list, new FuncN<String>() {
    @Override
    public String call(Object... args) {
        String concat = "";
        for (Object value : args) {
            if (value instanceof Integer) {
                concat  = (Integer) value;
            } else if (value instanceof String) {
                concat  = (String) value;
            }
        }
        return concat;
    }
});
  

Комментарии:

1. Как мне поступить, если у меня разные наблюдаемые типы? (например. Наблюдаемый <A>, наблюдаемый<B> и т.д.)

2. @user3762200 значения внутри args будут иметь тот же порядок, что и в list . Вы можете либо полагаться на этот порядок, либо проверять тип каждого значения. в зависимости от вашего варианта использования. Я отредактировал свой ответ.

3. Спасибо, это действительно помогло мне

4. Как FuncN<String>() определяется? Кажется, я не могу вызвать это.

Ответ №2:

Вы расширяете этот ответ, я использую его для одновременного чтения нескольких характеристик, это можно сделать следующим образом:

 connectionObservable
                .flatMap((Func1<RxBleConnection, Observable<?>>) rxBleConnection -> {
                    List<Observable<?>> list1 = Arrays.asList(
                            rxBleConnection.readCharacteristic(UUID...),
                            rxBleConnection.readCharacteristic(UUID...),
                            rxBleConnection.readCharacteristic(UUID...),
                            rxBleConnection.readCharacteristic(UUID...),
                            rxBleConnection.readCharacteristic(UUID...),
                            rxBleConnection.readCharacteristic(UUID...),
                            rxBleConnection.readCharacteristic(UUID...),
                            rxBleConnection.readCharacteristic(UUID...),
                            rxBleConnection.readCharacteristic(UUID...),
                            rxBleConnection.readCharacteristic(UUID...),
                            rxBleConnection.readCharacteristic(UUID...),
                            rxBleConnection.readCharacteristic(UUID...),
                            rxBleConnection.readCharacteristic(UUID...),
                            rxBleConnection.readCharacteristic(UUID...),
                            rxBleConnection.readCharacteristic(UUID...),
                            rxBleConnection.readCharacteristic(UUID...),
                            rxBleConnection.readCharacteristic(UUID...),
                            rxBleConnection.readCharacteristic(UUID...));
                    return Observable.combineLatest(list1, args -> {
                       Object o =  doSomethingWithResults(args);
                        return o;
                    });
                })
                .observeOn(AndroidSchedulers.mainThread())
                .doOnUnsubscribe(this::clearConnectionSubscription)
                .subscribe(retVal -> {
                    Log.d(TAG, "result:"   retVal.toString());
                    Log.w(TAG, "SUCCESS");
                    triggerDisconnect();

                }, MyActivity.this::onReadFailure);
    }
  

Комментарии, если у вас есть предложения о том, как улучшить этот процесс.

Комментарии:

1. Что происходит, когда одна из этих характеристик выдает ошибку onError?

2. Это переходит в onReadFailure, в моем случае я хочу выбросить все чтение, если одно из них завершится неудачей, чтобы оно работало для меня.

3. Замените каждую строку rxBleConnection.readCharacteristic(UUID...), на rxBleConnection.readCharacteristic(UUID...).onErrorResumeNext { bytes -> Observable.just(new byte[0])) }, , по сути, вы возвращаете пустой массив байтов, если определенная характеристика не найдена. код будет продолжен

Ответ №3:

RxKotlin поддерживает до 9 операторов в параметрах в методе combineLatest(), но использовать более 9 параметров означает передавать неограниченный список динамических пользовательских объектов. Вы можете использовать его, как показано ниже:

Сначала позвольте мне привести вам простой пример только с двумя параметрами с пользовательскими типами данных

 val name = Observable.just("MyName")
val age = Observable.just(25)
Observables.combineLatest(name, age) { n, a -> "$n - age:${a}" }
                .subscribe({
                    Log.d("combineLatest", "onNext - ${it}")
                })
  

Что теперь, если я хочу передать несколько параметров в combineLatest? Затем
ваш ответ ниже: (я использовал пользовательские типы данных, поэтому кто-то
пользовательская проблема также может быть решена здесь)

 val myList = arrayOf(Observable.just("MyName"),
                Observable.just(2),
                Observable.just(3.55),
                Observable.just("My Another String"),
                Observable.just(5),
                Observable.just(6),
                Observable.just(7),
                Observable.just(8),
                Observable.just(9),
                Observable.just(10),
                Observable.just(11),
                Observable.just(12),
                Observable.just(13),
                Observable.just(14),
                Observable.just(15))

Observable.combineLatest(myList, {
    val a = it[0] as String
    val b = it[1] as Int
    val c = it[2] as Float
    val d = it[3] as String
    val e = it[4] as Int
    val f = it[5] as Int
    val g = it[6] as Int
    val h = it[7] as Int
    val i = it[8] as Int
    val j = it[9] as Int
    val k = it[10] as Int
    val l = it[11] as Int
    val m = it[12] as Int
    "$a - age:${b}" })
        .subscribe({
            Log.d("combineLatest", "onNext - ${it}")
        })
  

Комментарии:

1. Что вы делаете с непроверенным приведением? Если все мои наблюдаемые в моем списке одинаковы, я знаю, что это за тип, но он все равно выдает предупреждение о непроверенном приведении

Ответ №4:

Вот простая функция расширения для RxKotlin, если у вас есть 10 исходных текстов для combineLatest . Вы можете легко создать аналогичные функции для большего количества источников или адаптировать это для работы с обычным RxJava.

 
import io.reactivex.Observable
import io.reactivex.rxkotlin.Observables

@Suppress("UNCHECKED_CAST", "unused")
inline fun <T1 : Any, T2 : Any, T3 : Any, T4 : Any, T5 : Any, T6 : Any, T7 : Any, T8 : Any, T9 : Any, T10 : Any, R : Any> Observables.combineLatest(
    source1: Observable<T1>, source2: Observable<T2>,
    source3: Observable<T3>, source4: Observable<T4>,
    source5: Observable<T5>, source6: Observable<T6>,
    source7: Observable<T7>, source8: Observable<T8>,
    source9: Observable<T9>, source10: Observable<T10>,
    crossinline combineFunction: (T1, T2, T3, T4, T5, T6, T7, T8, T9, T10) -> R
): Observable<R> =
    Observable.combineLatest(arrayOf(source1, source2, source3, source4, source5, source6, source7, source8, source9, source10)) {
        combineFunction(
            it[0] as T1,
            it[1] as T2,
            it[2] as T3,
            it[3] as T4,
            it[4] as T5,
            it[5] as T6,
            it[6] as T7,
            it[7] as T8,
            it[8] as T9,
            it[9] as T10
        )
    }
  

Примечание: Я создал это как функцию расширения, чтобы соответствовать тому, как combineLatest выглядят вызовы функций для менее чем 10 источников ( Observables.combineLatest(...) ). Таким образом, мне не нужно думать о том, какая combineLatest версия мне нужна для какого количества параметров. Технически нет необходимости делать это функцией расширения.

Ответ №5:

Чтобы расширить ответ Егора Нелюбы, вы можете объединить все результаты внутри объекта container, а затем использовать его так, как вы будете внутри предложения subscribe:

  List<Observable<?>> list = new ArrayList<>();
    list.add(mCreateMarkupFlowManager.getFlowState());
    list.add(mCreateIssueFlowStateManager.getIssueFlowState());
    list.add(mViewerStateManager.getMarkupLoadingProgressChanges());
    list.add(mViewerStateManager.getIssueLoadingProgressChanges());
    list.add(mMeasurementFlowStateManager.getFlowState());
    list.add(mViewerStateManager.isSheetLoaded());
    list.add(mProjectDataManager.isCreateFieldIssueEnabledForCurrentProject().distinctUntilChanged());
    list.add(mViewerStateManager.getMarkupViewMode());
    list.add(mViewerStateManager.isFirstPerson());
    list.add(mProjectDataManager.isCreateRfiEnabledForCurrentProject().distinctUntilChanged());
    list.add(mCreateRfiFlowStateManager.getRfiFlowState());

    attachSubscription(Observable.combineLatest(list, args -> {
                Holder holder = new Holder();
                holder.setFirst((String) args[0]);
                holder.setSecond((Integer) args[1]);
                holder.setThird((Boolean) args[2]);
                holder.setFourth((Boolean) args[3]);
                holder.setFifth((String) args[4]);
                holder.setSixth((Boolean) args[5]);
                holder.setSeventh((Boolean) args[6]);
                holder.setEighth((Boolean) args[7]);
                holder.setNinth((Boolean) args[8]);
                holder.setTenth((Boolean) args[9]);
                holder.setEleventh((String) args[10]);
                return holder;
            })
                    .filter(holder -> Util.isTrue(holder.sixth))
                    .compose(Util.applySchedulers())
                    .subscribe(holder -> {
                        if (isViewAttached()) {
                            String createMarkupState = holder.first;
                            Integer createIssueState = holder.second;
                            boolean markupsLoadingFinished = holder.third;
                            boolean issuesLoadingFinished = holder.fourth;
                            boolean loadingFinished = markupsLoadingFinished amp;amp; issuesLoadingFinished;
                            String measurementState = holder.fifth;
                            boolean isMarkupLockMode = holder.eighth;

                            boolean showCreateMarkupButton = shouldShowCreateMarkupButton();
                            boolean showCreateMeasureButton = shouldShowMeasureButton();
                            boolean showCreateFieldIssueButton = holder.seventh;
                            boolean isFirstPersonEnabled = holder.ninth;
                            Boolean showCreateRfiButton = holder.tenth;
                            String rfiFlowState = holder.eleventh;


                        }
                    })
    );


public class Holder {


public String first;
public Integer second;
public Boolean third;
public Boolean fourth;
public String fifth;
public Boolean sixth;
public Boolean seventh;
public Boolean eighth;
public Boolean ninth;
public Boolean tenth;
public String eleventh;

public void setEleventh(String eleventh) {
    this.eleventh = eleventh;
}


public void setFirst(String first) {
    this.first = first;
}


public void setSecond(Integer second) {
    this.second = second;
}


public void setThird(Boolean third) {
    this.third = third;
}


public void setFourth(Boolean fourth) {
    this.fourth = fourth;
}


public void setFifth(String fifth) {
    this.fifth = fifth;
}


public void setSixth(Boolean sixth) {
    this.sixth = sixth;
}


public void setSeventh(Boolean seventh) {
    this.seventh = seventh;
}


public void setEighth(Boolean eighth) {
    this.eighth = eighth;
}


public void setNinth(Boolean ninth) {
    this.ninth = ninth;
}


public void setTenth(Boolean tenth) {
    this.tenth = tenth;
}


public Holder() {}

}