#java #stream #rx-java
#java #поток #rx-java
Вопрос:
У меня есть список задач, которые необходимо обработать методом, процесс занимает много времени, поэтому я бы хотел, чтобы задачи в списке обрабатывались одна за другой в асинхронном режиме и возвращали результаты в виде асинхронного потока, поэтому последующей обработке не нужно ждать завершения процесса.весь список задач:
AsyncStream<R> methodA(List<T> tasks){tasks.forEach(t -> {calculation that takes a long time})}
После краткого поиска в Интернете я обнаружил, что RxJava может обрабатывать асинхронные потоковые данные, но введение, похоже, объясняет, как создать асинхронный поток данных. Итак, как создать асинхронный производитель / источник в Java?
Комментарии:
1. Что-то вроде этого здесь? baeldung.com/java-queue . Взгляните на них:
ConcurrentLinkedQueue, ArrayBlockingQueue, and ConcurrentLinkedDeque
они потокобезопасны.
Ответ №1:
Вы можете создать асинхронный Observable
, который будет выдавать значения, как только вычисления для данной задачи будут завершены. Для этого вам понадобится flatMap
operator . В упрощенном примере это будет выглядеть так :
static Observable<String> methodA(List<String> tasks) {
return Observable.from(tasks)
.flatMap(t -> Observable.just(t)
.map(t1 -> longRunningTask(t1))
.subscribeOn(Schedulers.io())
);
}
static String longRunningTask(String arg) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return arg;
}
Вы сопоставляете свои задачи Observable
и используете subscribeOn
их так, чтобы, когда что-то подписывается на них, подписка происходила в другом потоке. flatMap
оператор подписывается на все это Observables
сразу и выдает значения, как только они будут готовы. Вычисление является асинхронным, потому что подписка происходит в разных потоках из Scedulers.io
пула.
Комментарии:
1. Всегда ли порядок задач сохраняется в последующих результатах? @michalk
2. @Yulin порядок заданий может быть не сохранен — они будут отправлены из результирующего
Observable
в порядке завершения.