Как создать асинхронный производитель / источник в Java?

#java #stream #rx-java

#java #поток #rx-java

Вопрос:

У меня есть список задач, которые необходимо обработать методом, процесс занимает много времени, поэтому я бы хотел, чтобы задачи в списке обрабатывались одна за другой в асинхронном режиме и возвращали результаты в виде асинхронного потока, поэтому последующей обработке не нужно ждать завершения процесса.весь список задач:

 AsyncStream<R> methodA(List<T> tasks){tasks.forEach(t -> {calculation that takes a long time})}
 

После краткого поиска в Интернете я обнаружил, что RxJava может обрабатывать асинхронные потоковые данные, но введение, похоже, объясняет, как создать асинхронный поток данных. Итак, как создать асинхронный производитель / источник в Java?

Комментарии:

1. Что-то вроде этого здесь? baeldung.com/java-queue . Взгляните на них: ConcurrentLinkedQueue, ArrayBlockingQueue, and ConcurrentLinkedDeque они потокобезопасны.

Ответ №1:

Вы можете создать асинхронный Observable , который будет выдавать значения, как только вычисления для данной задачи будут завершены. Для этого вам понадобится flatMap operator . В упрощенном примере это будет выглядеть так :

 static Observable<String> methodA(List<String> tasks) {
     return Observable.from(tasks)
            .flatMap(t -> Observable.just(t)
                    .map(t1 -> longRunningTask(t1))
                    .subscribeOn(Schedulers.io())
            );

}

static String longRunningTask(String arg) {
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
    return arg;
}
 

Вы сопоставляете свои задачи Observable и используете subscribeOn их так, чтобы, когда что-то подписывается на них, подписка происходила в другом потоке. flatMap оператор подписывается на все это Observables сразу и выдает значения, как только они будут готовы. Вычисление является асинхронным, потому что подписка происходит в разных потоках из Scedulers.io пула.

Комментарии:

1. Всегда ли порядок задач сохраняется в последующих результатах? @michalk

2. @Yulin порядок заданий может быть не сохранен — они будут отправлены из результирующего Observable в порядке завершения.