Асинхронный поиск по массиву с помощью RxJava

#java #rx-java

#java #rx-java

Вопрос:

Я пытаюсь выполнить итерацию по массиву карт и выполнить некоторые асинхронные действия. Я попробовал несколько вещей, используя библиотеку RxJava, но все, что я пробовал, кажется синхронным. Я пытаюсь избежать создания новых потоков вручную и хочу позволить RxJava обрабатывать это. Это то, что я пробовал до сих пор.

 Observable.from(new Map[20])
            .subscribeOn(Schedulers.newThread()) 
            .observeOn(Schedulers.computation())
            .forEach(batch -> {
                try {
                    System.out.println(1);
                    Thread.sleep(3000);
                    System.out.println(2);
                } catch (Exception e) {

                }
            });
 

     Observable.from(new Map[20])
            .subscribeOn(Schedulers.newThread())
            .observeOn(Schedulers.computation())
            .subscribe(batch -> {
                try {
                    System.out.println(1);
                    Thread.sleep(3000);
                    System.out.println(2);
                } catch (Exception e) {

                }
            });
 

     Observable.from(new Map[20])
            .subscribeOn(Schedulers.newThread())
            .subscribe(batch -> {
                try {
                    System.out.println(1);
                    Thread.sleep(3000);
                    System.out.println(2);
                } catch (Exception e) {

                }
            });
 

     Observable.from(new Map[20])
            .subscribe(batch -> {
                try {
                    System.out.println(1);
                    Thread.sleep(3000);
                    System.out.println(2);
                } catch (Exception e) {

                }
            });
 

Когда я запускаю модульные тесты с приведенным выше кодом, я вижу следующий результат.

 1
2
1
2
1
2
...
 

Что я хочу видеть, так это

 1
1
1
... 
2
2
2
 

Как мне выполнить асинхронную итерацию по массиву Map с использованием RxJava?

Комментарии:

1. Асинхронный не означает неупорядоченный . Когда вы применяете Scheduler s, операции выполняются для указанного Scheduler вами, но результатом по-прежнему является поток, имеющий порядок.

Ответ №1:

Вы можете добиться его изменения с наблюдаемого на плавный и использовать параллельный:

         Flowable.fromIterable(array)
                .parallel(3) // number of items in parallel
                .runOn(Schedulers.newThread()) // the desired scheduler
                .map(item -> {
                    try {
                        System.out.println(1);
                        Thread.sleep(3000);
                        System.out.println(2);
                    } catch (Exception e) {

                    }

                    return Completable.complete();
                })
        .sequential().subscribe();
 

Комментарии:

1. Это сделало свое дело. Я использую doOnNext , doOnError , и doOnComplete для выполнения действий и ведения журнала.

Ответ №2:

Если вы застряли с использованием RxJava 1.x, у вас не будет доступа к классу Flowable. Это был не мой случай, но что-то вроде приведенного ниже кода может выполнять параллельные действия. Вложенности больше, но это работает.

     final ExecutorService executor = Executors.newFixedThreadPool(2);
    List<String> iterableList = new ArrayList<>();
    iterableList.add("one");
    iterableList.add("two");
    iterableList.add("three");
    iterableList.add("4");
    iterableList.add("5");
    iterableList.add("6");
    iterableList.add("7");
    iterableList.add("8");
    iterableList.add("9");
    Observable.from(iterableList)
            .flatMap(val -> Observable.just(val)
                    .subscribeOn(Schedulers.from(executor))
                    .doOnNext(numString -> {
                        try {
                            System.out.println(1);
                            Thread.sleep(500);
                            System.out.println(2);
                        } catch (Exception ex) {
                        }
                    })
            )
            .subscribe();