#java #concurrency #rx-java
#java #параллелизм #rx-java
Вопрос:
У меня есть коллекция из 20 элементов, я создам цикл для элементов и выполню вызовы API для получения данных, на основе возвращенных данных, которые мне нужно будет обновить в базе данных. Это требование простое, и я могу выполнить его на простой Java.
Теперь для повышения производительности я изучаю использование RxJava
. Я просмотрел много статей в Интернете и обнаружил, что люди ссылаются на async-http-client
библиотеку для асинхронных http-вызовов, я обнаружил, что библиотека устарела, и сопровождающий планирует передать ее кому-то другому, тот, который приведен в библиотеке RxJava, также похож на разработанный в 2014 году. Поскольку я новичок в RxJava, не могли бы вы помочь мне с правильным подходом.
В настоящее время я получаю все данные и преобразую их в наблюдаемые, как показано ниже
Observable<ENV> envs= Observable.fromIterable(allEnvs);
Мне также нужно получить некоторую помощь, например, подходит ли приведенный выше код или я должен создать следующее для наблюдаемой конструкции, это фрагмент в groovy, который мне нужно будет написать на Java.
val createObserver = Observable.create(ObservableOnSubscribe<String> { emitter ->
emitter.onNext("Hello World")
emitter.onComplete()
})
Пожалуйста, помогите мне выбрать наилучший подход
Комментарии:
1. извините, вопрос неясен
2. Мне нужно выполнить 20 вызовов API, и я хотел сделать это параллельно, чтобы уменьшить задержку и, возможно, использовать
CompletableFuture
Ответ №1:
Представьте, что http-вызов представлен классом ниже :
public class HttpCall implements Callable<String> {
private final int i;
private HttpCall(int i) {
this.i = i;
}
@Override
public String call() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Something for : " i;
}
}
Он ожидает 2 секунды, а затем выдает строку (результат http-вызова).
Чтобы объединить все элементы, полученные в результате разных http-вызовов, мы можем использовать merge
operator . Но перед этим нам нужно преобразовать Callable
в an Observable
с помощью fromCallable
operator .
void sequentially() {
List<Observable<String>> httpRequests = IntStream.range(0, 20)
.mapToObj(HttpCall::new)
.map(Observable::fromCallable)
.collect(Collectors.toList());
Observable.merge(httpRequests)
.timestamp(TimeUnit.SECONDS)
.subscribe(e -> System.out.println("Elapsed time : " e.time() " -- " e.value() ". Executed on thread : " Thread.currentThread().getName()));
}
Поскольку все запросы выполняются в одном потоке, порядок сохраняется :
Прошедшее время: 1602122218 — Что-то для : 0. Выполняется в потоке: основное
прошедшее время: 1602122220 — Что-то для: 1. Выполняется в потоке: основное
прошедшее время: 1602122222 — Что-то для : 2. Выполняется в потоке: main
…
Как вы можете видеть, элементы разделены на 2 секунды.
Чтобы выполнить каждый запрос в своем собственном потоке, нам нужно сообщить Rx, что нам нужен поток для каждого вызова. Проще простого, просто переключитесь на один из предложенных планировщиков. Ввод-вывод — это то, что нам нужно (поскольку это операция ввода-вывода).
void parallel( {
List<Observable<String>> httpRequests = IntStream.range(0, 20)
.mapToObj(HttpCall::new)
.map(httpCall -> Observable.fromCallable(httpCall)
.subscribeOn(Schedulers.io())
) // take a thread from the IO pool
.collect(Collectors.toList());
Observable.merge(httpRequests)
.timestamp(TimeUnit.SECONDS)
.subscribe(e -> System.out.println("Elapsed time : " e.time() " -- " e.value() ". Executed on thread : " Thread.currentThread().getName()));
}
На этот раз заказ не гарантируется, и они создаются почти одновременно :
Прошедшее время: 1602123707 — Что-то для: 2. Выполняется в потоке: RxCachedThreadScheduler-3
Прошедшее время: 1602123707 — Что-то для : 0. Выполняется в потоке: RxCachedThreadScheduler-1
Прошедшее время: 1602123707 — Что-то для : 1. Выполняется в потоке: RxCachedThreadScheduler-1
…
Код может быть сокращен следующим образом :
Observable.range(0, 20)
.map(HttpCall::new)
.flatMap(httpCall -> Observable.fromCallable(httpCall).subscribeOn(Schedulers.io()))
.timestamp(TimeUnit.SECONDS)
.subscribe(e -> System.out.println("Elapsed time : " e.time() " -- " e.value() ". Executed on thread : " Thread.currentThread().getName()));
merge
использует flatMap
за кулисами.