#java #rx-java
#java #rx-java
Вопрос:
Я пытаюсь выполнить итерацию по массиву карт и выполнить некоторые асинхронные действия. Я попробовал несколько вещей, используя библиотеку RxJava, но все, что я пробовал, кажется синхронным. Я пытаюсь избежать создания новых потоков вручную и хочу позволить RxJava обрабатывать это. Это то, что я пробовал до сих пор.
Observable.from(new Map[20])
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.computation())
.forEach(batch -> {
try {
System.out.println(1);
Thread.sleep(3000);
System.out.println(2);
} catch (Exception e) {
}
});
Observable.from(new Map[20])
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.computation())
.subscribe(batch -> {
try {
System.out.println(1);
Thread.sleep(3000);
System.out.println(2);
} catch (Exception e) {
}
});
Observable.from(new Map[20])
.subscribeOn(Schedulers.newThread())
.subscribe(batch -> {
try {
System.out.println(1);
Thread.sleep(3000);
System.out.println(2);
} catch (Exception e) {
}
});
Observable.from(new Map[20])
.subscribe(batch -> {
try {
System.out.println(1);
Thread.sleep(3000);
System.out.println(2);
} catch (Exception e) {
}
});
Когда я запускаю модульные тесты с приведенным выше кодом, я вижу следующий результат.
1
2
1
2
1
2
...
Что я хочу видеть, так это
1
1
1
...
2
2
2
Как мне выполнить асинхронную итерацию по массиву Map с использованием RxJava?
Комментарии:
1. Асинхронный не означает неупорядоченный . Когда вы применяете
Scheduler
s, операции выполняются для указанногоScheduler
вами, но результатом по-прежнему является поток, имеющий порядок.
Ответ №1:
Вы можете добиться его изменения с наблюдаемого на плавный и использовать параллельный:
Flowable.fromIterable(array)
.parallel(3) // number of items in parallel
.runOn(Schedulers.newThread()) // the desired scheduler
.map(item -> {
try {
System.out.println(1);
Thread.sleep(3000);
System.out.println(2);
} catch (Exception e) {
}
return Completable.complete();
})
.sequential().subscribe();
Комментарии:
1. Это сделало свое дело. Я использую
doOnNext
,doOnError
, иdoOnComplete
для выполнения действий и ведения журнала.
Ответ №2:
Если вы застряли с использованием RxJava 1.x, у вас не будет доступа к классу Flowable. Это был не мой случай, но что-то вроде приведенного ниже кода может выполнять параллельные действия. Вложенности больше, но это работает.
final ExecutorService executor = Executors.newFixedThreadPool(2);
List<String> iterableList = new ArrayList<>();
iterableList.add("one");
iterableList.add("two");
iterableList.add("three");
iterableList.add("4");
iterableList.add("5");
iterableList.add("6");
iterableList.add("7");
iterableList.add("8");
iterableList.add("9");
Observable.from(iterableList)
.flatMap(val -> Observable.just(val)
.subscribeOn(Schedulers.from(executor))
.doOnNext(numString -> {
try {
System.out.println(1);
Thread.sleep(500);
System.out.println(2);
} catch (Exception ex) {
}
})
)
.subscribe();