Выполнение около 20 HTTP-вызовов и передача данных в базу данных с использованием Java

#java #concurrency #rx-java

#java #параллелизм #rx-java

Вопрос:

У меня есть коллекция из 20 элементов, я создам цикл для элементов и выполню вызовы API для получения данных, на основе возвращенных данных, которые мне нужно будет обновить в базе данных. Это требование простое, и я могу выполнить его на простой Java.

Теперь для повышения производительности я изучаю использование RxJava . Я просмотрел много статей в Интернете и обнаружил, что люди ссылаются на async-http-client библиотеку для асинхронных http-вызовов, я обнаружил, что библиотека устарела, и сопровождающий планирует передать ее кому-то другому, тот, который приведен в библиотеке RxJava, также похож на разработанный в 2014 году. Поскольку я новичок в RxJava, не могли бы вы помочь мне с правильным подходом.

В настоящее время я получаю все данные и преобразую их в наблюдаемые, как показано ниже

 Observable<ENV> envs= Observable.fromIterable(allEnvs);
  

Мне также нужно получить некоторую помощь, например, подходит ли приведенный выше код или я должен создать следующее для наблюдаемой конструкции, это фрагмент в groovy, который мне нужно будет написать на Java.

 val createObserver = Observable.create(ObservableOnSubscribe<String> { emitter ->
    emitter.onNext("Hello World")
    emitter.onComplete()
})
  

Пожалуйста, помогите мне выбрать наилучший подход

Комментарии:

1. извините, вопрос неясен

2. Мне нужно выполнить 20 вызовов API, и я хотел сделать это параллельно, чтобы уменьшить задержку и, возможно, использовать CompletableFuture

Ответ №1:

Представьте, что http-вызов представлен классом ниже :

 public class HttpCall implements Callable<String> {

    private final int i;

    private HttpCall(int i) {
        this.i = i;
    }

    @Override
    public String call() {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return "Something for : "   i;
    }
}
  

Он ожидает 2 секунды, а затем выдает строку (результат http-вызова).

Чтобы объединить все элементы, полученные в результате разных http-вызовов, мы можем использовать merge operator . Но перед этим нам нужно преобразовать Callable в an Observable с помощью fromCallable operator .

 void sequentially() {
    List<Observable<String>> httpRequests = IntStream.range(0, 20)
            .mapToObj(HttpCall::new)
            .map(Observable::fromCallable)
            .collect(Collectors.toList());

    Observable.merge(httpRequests)
            .timestamp(TimeUnit.SECONDS)
            .subscribe(e -> System.out.println("Elapsed time : "   e.time()   " -- "   e.value()   ". Executed on thread : "   Thread.currentThread().getName()));
}
  

Поскольку все запросы выполняются в одном потоке, порядок сохраняется :

Прошедшее время: 1602122218 — Что-то для : 0. Выполняется в потоке: основное
прошедшее время: 1602122220 — Что-то для: 1. Выполняется в потоке: основное
прошедшее время: 1602122222 — Что-то для : 2. Выполняется в потоке: main

Как вы можете видеть, элементы разделены на 2 секунды.

Чтобы выполнить каждый запрос в своем собственном потоке, нам нужно сообщить Rx, что нам нужен поток для каждого вызова. Проще простого, просто переключитесь на один из предложенных планировщиков. Ввод-вывод — это то, что нам нужно (поскольку это операция ввода-вывода).

 void parallel( {
    List<Observable<String>> httpRequests = IntStream.range(0, 20)
            .mapToObj(HttpCall::new)
            .map(httpCall -> Observable.fromCallable(httpCall)
                                .subscribeOn(Schedulers.io())
            ) // take a thread from the IO pool
            .collect(Collectors.toList());

    Observable.merge(httpRequests)
            .timestamp(TimeUnit.SECONDS)
            .subscribe(e -> System.out.println("Elapsed time : "   e.time()   " -- "   e.value()   ". Executed on thread : "   Thread.currentThread().getName()));
}
  

На этот раз заказ не гарантируется, и они создаются почти одновременно :

Прошедшее время: 1602123707 — Что-то для: 2. Выполняется в потоке: RxCachedThreadScheduler-3
Прошедшее время: 1602123707 — Что-то для : 0. Выполняется в потоке: RxCachedThreadScheduler-1
Прошедшее время: 1602123707 — Что-то для : 1. Выполняется в потоке: RxCachedThreadScheduler-1


Код может быть сокращен следующим образом :

 Observable.range(0, 20)
                .map(HttpCall::new)
                .flatMap(httpCall -> Observable.fromCallable(httpCall).subscribeOn(Schedulers.io()))
                .timestamp(TimeUnit.SECONDS)
                .subscribe(e -> System.out.println("Elapsed time : "   e.time()   " -- "   e.value()   ". Executed on thread : "   Thread.currentThread().getName()));
  

merge использует flatMap за кулисами.