Android RxJava обрабатывает строки http-ответов в отдельных потоках

#java #android #multithreading #rx-java

Вопрос:

Я изучаю RxJava, чтобы посмотреть, смогу ли я использовать его для замены устаревших асинхронных задач в приложении, созданном несколько лет назад.

мой вариант использования выглядит следующим образом:

  1. сделайте http — запрос на Schedulers.io это возвращает несколько строк
  2. обрабатывайте строки отдельно, в параллельных потоках
  3. обновите пользовательский интерфейс в главном потоке только после обработки всех строк

есть ли способ легко выполнить шаг 2 в rx java?

Ниже приведен пример кода.

Спасибо

 Observable.fromCallable(()-> {

    // 1- get rows form server
    ArrayList<HashMap<String, Object>> rows = new ArrayList<HashMap<String, Object>>();

    // 2- process rows 
    for (HashMap row : rows) {
        //manipulate row
        row.put("test", "test");  <-- code that I want to parallelize
    }

    return rows;
})
        .subscribeOn(Schedulers.io())// Execute in IO thread, i.e. background thread.
        .observeOn(AndroidSchedulers.mainThread())// report or post the result to main thread.
        .subscribeWith(new Observer<ArrayList<HashMap<String, Object>>>() {

            @Override
            public void onSubscribe(@NonNull Disposable d) {

            }

            @Override
            public void onNext(@NonNull ArrayList<HashMap<String, Object>> hashMaps) {

            }

            @Override
            public void onError(@NonNull Throwable e) {

            }

            @Override
            public void onComplete() {
                //3- update UI....
            }
        }); 
 

Комментарии:

1. Это должно быть довольно простой задачей с RxJS. Загвоздка в том, что JavaScript однопоточен, поэтому, скажем, «в параллельных потоках» невозможно. Однако вы можете включить их все в цикл событий сразу.

2. извините, что я написал RxJS, но я имею в виду RxJava

Ответ №1:

После нескольких попыток я пришел к этому решению.

Добавив журналы в функцию processRow, я увидел, что она вызывается параллельно для нескольких строк, как всегда, в конце вызывается завершение.

 Observable.fromCallable(() -> getListResponse()) // 1- get rows form server
                .subscribeOn(Schedulers.io())
                .flatMapIterable(rowItem -> rowItem) 
                .flatMap(val -> Observable.just(val) //paralelize
                        .subscribeOn(Schedulers.computation())
                        .map(i -> processRow(i) )) // 2- process rows in parallel threads
                .observeOn(AndroidSchedulers.mainThread()) 
                .subscribe(new Observer<Object>() {

                    @Override
                    public void onSubscribe(@NonNull Disposable d) {
                        
                    }

                    @Override
                    public void onNext(@NonNull Object listResponse) { }

                    @Override
                    public void onError(@NonNull Throwable e) {
                        
                        e.printStackTrace();
                        
                    }

                    @Override
                    public void onComplete() {
                       //3- update UI....
                    }
                });