#java #android #multithreading #rx-java
Вопрос:
Я изучаю RxJava, чтобы посмотреть, смогу ли я использовать его для замены устаревших асинхронных задач в приложении, созданном несколько лет назад.
мой вариант использования выглядит следующим образом:
- сделайте http — запрос на Schedulers.io это возвращает несколько строк
- обрабатывайте строки отдельно, в параллельных потоках
- обновите пользовательский интерфейс в главном потоке только после обработки всех строк
есть ли способ легко выполнить шаг 2 в rx java?
Ниже приведен пример кода.
Спасибо
Observable.fromCallable(()-> {
// 1- get rows form server
ArrayList<HashMap<String, Object>> rows = new ArrayList<HashMap<String, Object>>();
// 2- process rows
for (HashMap row : rows) {
//manipulate row
row.put("test", "test"); <-- code that I want to parallelize
}
return rows;
})
.subscribeOn(Schedulers.io())// Execute in IO thread, i.e. background thread.
.observeOn(AndroidSchedulers.mainThread())// report or post the result to main thread.
.subscribeWith(new Observer<ArrayList<HashMap<String, Object>>>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull ArrayList<HashMap<String, Object>> hashMaps) {
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
//3- update UI....
}
});
Комментарии:
1. Это должно быть довольно простой задачей с RxJS. Загвоздка в том, что JavaScript однопоточен, поэтому, скажем, «в параллельных потоках» невозможно. Однако вы можете включить их все в цикл событий сразу.
2. извините, что я написал RxJS, но я имею в виду RxJava
Ответ №1:
После нескольких попыток я пришел к этому решению.
Добавив журналы в функцию processRow, я увидел, что она вызывается параллельно для нескольких строк, как всегда, в конце вызывается завершение.
Observable.fromCallable(() -> getListResponse()) // 1- get rows form server
.subscribeOn(Schedulers.io())
.flatMapIterable(rowItem -> rowItem)
.flatMap(val -> Observable.just(val) //paralelize
.subscribeOn(Schedulers.computation())
.map(i -> processRow(i) )) // 2- process rows in parallel threads
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Object>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Object listResponse) { }
@Override
public void onError(@NonNull Throwable e) {
e.printStackTrace();
}
@Override
public void onComplete() {
//3- update UI....
}
});