#android #kotlin #rx-kotlin
#java #rx-java
Вопрос:
Я использую RxJava и хочу объединить 12 различных наблюдаемых с помощью оператора combineLatest
.
Я видел прототип функции, который принимает список наблюдаемых и реализацию FuncN
но я не уверен, как это сделать, у меня возникли проблемы с реализацией call
метода.
Может кто-нибудь показать мне пример?
Комментарии:
1. Пожалуйста, отформатируйте свой код и функции, используя
``
Ответ №1:
Существует, combineLatest
который принимает List
несколько наблюдаемых. Вот пример того, как его использовать:
List<Observable<?>> list = Arrays.asList(Observable.just(1), Observable.just("2"));
Observable.combineLatest(list, new FuncN<String>() {
@Override
public String call(Object... args) {
String concat = "";
for (Object value : args) {
if (value instanceof Integer) {
concat = (Integer) value;
} else if (value instanceof String) {
concat = (String) value;
}
}
return concat;
}
});
Комментарии:
1. Как мне поступить, если у меня разные наблюдаемые типы? (например. Наблюдаемый <A>, наблюдаемый<B> и т.д.)
2. @user3762200 значения внутри
args
будут иметь тот же порядок, что и вlist
. Вы можете либо полагаться на этот порядок, либо проверять тип каждого значения. в зависимости от вашего варианта использования. Я отредактировал свой ответ.3. Спасибо, это действительно помогло мне
4. Как
FuncN<String>()
определяется? Кажется, я не могу вызвать это.
Ответ №2:
Вы расширяете этот ответ, я использую его для одновременного чтения нескольких характеристик, это можно сделать следующим образом:
connectionObservable
.flatMap((Func1<RxBleConnection, Observable<?>>) rxBleConnection -> {
List<Observable<?>> list1 = Arrays.asList(
rxBleConnection.readCharacteristic(UUID...),
rxBleConnection.readCharacteristic(UUID...),
rxBleConnection.readCharacteristic(UUID...),
rxBleConnection.readCharacteristic(UUID...),
rxBleConnection.readCharacteristic(UUID...),
rxBleConnection.readCharacteristic(UUID...),
rxBleConnection.readCharacteristic(UUID...),
rxBleConnection.readCharacteristic(UUID...),
rxBleConnection.readCharacteristic(UUID...),
rxBleConnection.readCharacteristic(UUID...),
rxBleConnection.readCharacteristic(UUID...),
rxBleConnection.readCharacteristic(UUID...),
rxBleConnection.readCharacteristic(UUID...),
rxBleConnection.readCharacteristic(UUID...),
rxBleConnection.readCharacteristic(UUID...),
rxBleConnection.readCharacteristic(UUID...),
rxBleConnection.readCharacteristic(UUID...),
rxBleConnection.readCharacteristic(UUID...));
return Observable.combineLatest(list1, args -> {
Object o = doSomethingWithResults(args);
return o;
});
})
.observeOn(AndroidSchedulers.mainThread())
.doOnUnsubscribe(this::clearConnectionSubscription)
.subscribe(retVal -> {
Log.d(TAG, "result:" retVal.toString());
Log.w(TAG, "SUCCESS");
triggerDisconnect();
}, MyActivity.this::onReadFailure);
}
Комментарии, если у вас есть предложения о том, как улучшить этот процесс.
Комментарии:
1. Что происходит, когда одна из этих характеристик выдает ошибку onError?
2. Это переходит в onReadFailure, в моем случае я хочу выбросить все чтение, если одно из них завершится неудачей, чтобы оно работало для меня.
3. Замените каждую строку
rxBleConnection.readCharacteristic(UUID...),
наrxBleConnection.readCharacteristic(UUID...).onErrorResumeNext { bytes -> Observable.just(new byte[0])) },
, по сути, вы возвращаете пустой массив байтов, если определенная характеристика не найдена. код будет продолжен
Ответ №3:
RxKotlin поддерживает до 9 операторов в параметрах в методе combineLatest(), но использовать более 9 параметров означает передавать неограниченный список динамических пользовательских объектов. Вы можете использовать его, как показано ниже:
Сначала позвольте мне привести вам простой пример только с двумя параметрами с пользовательскими типами данных
val name = Observable.just("MyName")
val age = Observable.just(25)
Observables.combineLatest(name, age) { n, a -> "$n - age:${a}" }
.subscribe({
Log.d("combineLatest", "onNext - ${it}")
})
Что теперь, если я хочу передать несколько параметров в combineLatest? Затем
ваш ответ ниже: (я использовал пользовательские типы данных, поэтому кто-то
пользовательская проблема также может быть решена здесь)
val myList = arrayOf(Observable.just("MyName"),
Observable.just(2),
Observable.just(3.55),
Observable.just("My Another String"),
Observable.just(5),
Observable.just(6),
Observable.just(7),
Observable.just(8),
Observable.just(9),
Observable.just(10),
Observable.just(11),
Observable.just(12),
Observable.just(13),
Observable.just(14),
Observable.just(15))
Observable.combineLatest(myList, {
val a = it[0] as String
val b = it[1] as Int
val c = it[2] as Float
val d = it[3] as String
val e = it[4] as Int
val f = it[5] as Int
val g = it[6] as Int
val h = it[7] as Int
val i = it[8] as Int
val j = it[9] as Int
val k = it[10] as Int
val l = it[11] as Int
val m = it[12] as Int
"$a - age:${b}" })
.subscribe({
Log.d("combineLatest", "onNext - ${it}")
})
Комментарии:
1. Что вы делаете с непроверенным приведением? Если все мои наблюдаемые в моем списке одинаковы, я знаю, что это за тип, но он все равно выдает предупреждение о непроверенном приведении
Ответ №4:
Вот простая функция расширения для RxKotlin, если у вас есть 10 исходных текстов для combineLatest
. Вы можете легко создать аналогичные функции для большего количества источников или адаптировать это для работы с обычным RxJava.
import io.reactivex.Observable
import io.reactivex.rxkotlin.Observables
@Suppress("UNCHECKED_CAST", "unused")
inline fun <T1 : Any, T2 : Any, T3 : Any, T4 : Any, T5 : Any, T6 : Any, T7 : Any, T8 : Any, T9 : Any, T10 : Any, R : Any> Observables.combineLatest(
source1: Observable<T1>, source2: Observable<T2>,
source3: Observable<T3>, source4: Observable<T4>,
source5: Observable<T5>, source6: Observable<T6>,
source7: Observable<T7>, source8: Observable<T8>,
source9: Observable<T9>, source10: Observable<T10>,
crossinline combineFunction: (T1, T2, T3, T4, T5, T6, T7, T8, T9, T10) -> R
): Observable<R> =
Observable.combineLatest(arrayOf(source1, source2, source3, source4, source5, source6, source7, source8, source9, source10)) {
combineFunction(
it[0] as T1,
it[1] as T2,
it[2] as T3,
it[3] as T4,
it[4] as T5,
it[5] as T6,
it[6] as T7,
it[7] as T8,
it[8] as T9,
it[9] as T10
)
}
Примечание: Я создал это как функцию расширения, чтобы соответствовать тому, как combineLatest
выглядят вызовы функций для менее чем 10 источников ( Observables.combineLatest(...)
). Таким образом, мне не нужно думать о том, какая combineLatest
версия мне нужна для какого количества параметров. Технически нет необходимости делать это функцией расширения.
Ответ №5:
Чтобы расширить ответ Егора Нелюбы, вы можете объединить все результаты внутри объекта container, а затем использовать его так, как вы будете внутри предложения subscribe:
List<Observable<?>> list = new ArrayList<>();
list.add(mCreateMarkupFlowManager.getFlowState());
list.add(mCreateIssueFlowStateManager.getIssueFlowState());
list.add(mViewerStateManager.getMarkupLoadingProgressChanges());
list.add(mViewerStateManager.getIssueLoadingProgressChanges());
list.add(mMeasurementFlowStateManager.getFlowState());
list.add(mViewerStateManager.isSheetLoaded());
list.add(mProjectDataManager.isCreateFieldIssueEnabledForCurrentProject().distinctUntilChanged());
list.add(mViewerStateManager.getMarkupViewMode());
list.add(mViewerStateManager.isFirstPerson());
list.add(mProjectDataManager.isCreateRfiEnabledForCurrentProject().distinctUntilChanged());
list.add(mCreateRfiFlowStateManager.getRfiFlowState());
attachSubscription(Observable.combineLatest(list, args -> {
Holder holder = new Holder();
holder.setFirst((String) args[0]);
holder.setSecond((Integer) args[1]);
holder.setThird((Boolean) args[2]);
holder.setFourth((Boolean) args[3]);
holder.setFifth((String) args[4]);
holder.setSixth((Boolean) args[5]);
holder.setSeventh((Boolean) args[6]);
holder.setEighth((Boolean) args[7]);
holder.setNinth((Boolean) args[8]);
holder.setTenth((Boolean) args[9]);
holder.setEleventh((String) args[10]);
return holder;
})
.filter(holder -> Util.isTrue(holder.sixth))
.compose(Util.applySchedulers())
.subscribe(holder -> {
if (isViewAttached()) {
String createMarkupState = holder.first;
Integer createIssueState = holder.second;
boolean markupsLoadingFinished = holder.third;
boolean issuesLoadingFinished = holder.fourth;
boolean loadingFinished = markupsLoadingFinished amp;amp; issuesLoadingFinished;
String measurementState = holder.fifth;
boolean isMarkupLockMode = holder.eighth;
boolean showCreateMarkupButton = shouldShowCreateMarkupButton();
boolean showCreateMeasureButton = shouldShowMeasureButton();
boolean showCreateFieldIssueButton = holder.seventh;
boolean isFirstPersonEnabled = holder.ninth;
Boolean showCreateRfiButton = holder.tenth;
String rfiFlowState = holder.eleventh;
}
})
);
public class Holder {
public String first;
public Integer second;
public Boolean third;
public Boolean fourth;
public String fifth;
public Boolean sixth;
public Boolean seventh;
public Boolean eighth;
public Boolean ninth;
public Boolean tenth;
public String eleventh;
public void setEleventh(String eleventh) {
this.eleventh = eleventh;
}
public void setFirst(String first) {
this.first = first;
}
public void setSecond(Integer second) {
this.second = second;
}
public void setThird(Boolean third) {
this.third = third;
}
public void setFourth(Boolean fourth) {
this.fourth = fourth;
}
public void setFifth(String fifth) {
this.fifth = fifth;
}
public void setSixth(Boolean sixth) {
this.sixth = sixth;
}
public void setSeventh(Boolean seventh) {
this.seventh = seventh;
}
public void setEighth(Boolean eighth) {
this.eighth = eighth;
}
public void setNinth(Boolean ninth) {
this.ninth = ninth;
}
public void setTenth(Boolean tenth) {
this.tenth = tenth;
}
public Holder() {}
}