Как выполнять длительные вычисления в фоновом потоке в RxJava Android

#java #android #multithreading #rx-java

#java #Android #многопоточность #rx-java

Вопрос:

Я хочу выполнять длительные вычисления в фоновом потоке, используя RxJava в Android. После вычисления я пытаюсь представить результат в Recylerview. Я использую следующий фрагмент кода:

 Observable.just("true")
                .subscribeOn(Schedulers.io())
                .map(new Func1<String, String>() {
                    @Override
                    public String call(String s) {
                        feedlist.clear();
                        if (eventFeedItems != null amp;amp; !eventFeedItems.isEmpty()) {
                            for (int i = 0; i < eventFeedItems.size(); i  ) {
                                if (eventFeedItems != null amp;amp; eventFeedItems.get(i) != null
                                        amp;amp; ((eventFeedItems.get(i).getType() != null amp;amp; eventFeedItems.get(i).getType().equalsIgnoreCase("EVENT"))
                                        || (eventFeedItems.get(i).getActivityRequestType() != null amp;amp; eventFeedItems.get(i).getActivityRequestType().equalsIgnoreCase(EventConstants.TRENDING_ACTIVITY)))) {
                                    if (eventFeedItems.get(i).getActivityRequestType() != null amp;amp; !eventFeedItems.get(i).getActivityRequestType().equalsIgnoreCase("")) {
                                        feedlist.add(new FeedsListModel(eventFeedItems.get(i), eventFeedItems.get(i).getActivityRequestType(), null));
                                    } else if (eventFeedItems.get(i).getRequestType() != null amp;amp; !eventFeedItems.get(i).getRequestType().equalsIgnoreCase("")) {
                                        feedlist.add(new FeedsListModel(eventFeedItems.get(i), eventFeedItems.get(i).getRequestType(), null));
                                    } else
                                        feedlist.add(new FeedsListModel(eventFeedItems.get(i), EventConstants.ATTENDEE_POST, null));
                                }
                            }
                        }
                        Log.d("calculations","Completed");
                        return "";
                    }
                })
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
//                        feed_list.setLayoutManager(mLayoutManager);
//                        feedListAdapter.notifyDataSetChanged();
                        Log.d("Adapter", "Set");
                    }
                }, new Action1<Throwable>() {
                    @Override
                    public void call(Throwable throwable) {
                        Log.d("Exception", "oh! fish...");
                        throwable.printStackTrace();
                    }
                });  

С приведенным выше фрагментом кода я сталкиваюсь с помехами пользовательского интерфейса, поскольку размер ArrayList eventFeedItems составляет примерно более 300 элементов.Я новичок в RxJava. Пожалуйста, помогите мне.

Ответ №1:

вы не добьетесь параллелизма, используя map-Operator.

Первая подписка переместит все эмиссии в IO-scheduler. Здесь не происходит параллелизма.

 .subscribeOn(Schedulers.io())
  

Оператор map будет вызван синхронно из предыдущего потока. В вашем случае это был бы какой-то поток из IO-threadpool.

 .map(new Func1<String, String>() {
  

После выполнения map-Operator вы переместите значение из потока ввода-вывода в цикл событий Android-UI с помощью

 .observeOn(AndroidSchedulers.mainThread())
  

После того, как значение было переведено из потока ввода-вывода в поток пользовательского интерфейса, будет обработано следующее значение из исходного наблюдаемого.

 Observable.just("true")
  

В вашем примере больше не будет значений, потому что вы создаете только одно значение.

Для достижения параллелизма вы должны использовать flatMap вместо map. И используйте subscribeOn() в flatMap для создания каждого потока в другом потоке.

Пожалуйста, рассмотрите этот пример, чтобы увидеть, как происходит параллелизм. Каждый наблюдаемый объект будет подписан сразу, поэтому максимальное время для teset будет составлять что-то около 5 секунд. Если бы сейчас произошел параллелизм, потребовалось бы 1 2 3 4 5 секунды плюс время выполнения.

 @Test
public void name1() throws Exception {

        Observable<Integer> value = Observable.just(1_000, 2_000, 3_000, 4_000, 5_000)
                .flatMap(i -> Observable.fromCallable(() -> doWork(i)).subscribeOn(Schedulers.io())
                ).doOnNext(integer -> System.out.println("value"));

        value.test().awaitTerminalEvent();
}

private int doWork(int sleepMilli) {
        try {
            Thread.sleep(sleepMilli);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return -1;
}
  

Если вы хотите узнать больше о том, как происходит параллелизм с flatMap, пожалуйста, рассмотрите возможность чтенияhttp://tomstechnicalblog.blogspot.de/2015/11/rxjava-achieving-parallelization.html

Что касается вашего кода, я бы предложил:

  • Переместите анонимную реализацию интерфейса в реализацию частного внутреннего класса и используйте его экземпляр. Вы получите более читаемый наблюдаемый
  • Не используйте побочные эффекты для глобальных переменных из операторов внутри. Вы получите состояние гонки, если задействован параллелизм.

     List<FeedsListModel> eventFeedItems = Arrays.asList(new FeedsListModel(), new FeedsListModel());
    
    Observable<FeedsListModel> feedsListModelObservable = Observable.fromIterable(eventFeedItems)
                        .flatMap(feedsListModel -> Observable.fromCallable(() -> calculation(feedsListModel))
                                .subscribeOn(Schedulers.computation())
    );
    
    feedsListModelObservable
            .toList()
            .observeOn(Schedulers.io())
            .subscribe(feedsListModels -> {
                // do UI stuff
    });
      

Вспомогательный метод:

 private FeedsListModel calculation(FeedsListModel model) {
    // do calculation here

    return new FeedsListModel();
}