#reactive-programming #spring-webflux #project-reactor
#реактивное программирование #spring-webflux #проект-реактор
Вопрос:
Я новичок в spring webflux, и мое текущее приложение spring boot использует планировщик (с пометкой @Scheduled) для чтения списка данных из БД, одновременного пакетного вызова rest api, а затем записи в поток событий. Я хочу добиться того же в Spring webflux.
- Должен ли я использовать @Scheduled или использовать schedulePeriodically из Webflux?
- Как я могу разбивать элементы из БД на меньшие наборы (скажем, 10 элементов) и одновременно вызывать rest api?
- В настоящее время приложение извлекает максимум 100 записей за один запуск планировщика, а затем обрабатывает их. Я планирую перейти на r2dbc, если я это сделаю, могу ли я ограничить поток данных, например, 100?
Спасибо
Ответ №1:
1. Должен ли я использовать @Scheduled или использовать schedulePeriodically из Webflux?
@Scheduled
это аннотация, которая является частью запланированного пакета spring framework, в то время как schedulePeriodically — это функция, которая является частью reactor, поэтому вы не можете сравнивать их. Я не вижу никаких проблем в использовании аннотации, поскольку она является частью основного фреймворка.
2. Как я могу разбивать элементы из БД на меньшие наборы (скажем, 10 элементов) и одновременно вызывать rest api?
Используя Flux#buffer
функции, которые будут выдавать список элементов, когда буфер будет заполнен.
Flux.just("1", "2", "3", "4")
.buffer(2)
.doOnNext(list -> {
System.out.println(list.size());
}).subscribe()
Будет печатать 2 каждый раз.
3. В настоящее время приложение извлекает не более 100 записей за один запуск планировщика, а затем обрабатывает их. Я планирую перейти на r2dbc, если я это сделаю, могу ли я ограничить поток данных, например, 100?
Ну, вы можете, как было написано ранее, извлекать, а затем буферизировать ответы в списки по 100, затем вы можете поместить каждый список в свой собственный поток и снова выдавать элементы или обрабатывать каждый список из 100 элементов. Зависит от вас.
В сегменте буфера есть много функций, ознакомьтесь с ними.
Комментарии:
1. спасибо за ваш ответ на Erktumlare. Я беспокоюсь, что буфер поместит все в память, что может привести к ООМ. Как насчет методов ограничения? Приведенный выше пример кода не компилируется, поскольку нет такого метода, как doSuccess. (rector-core: 3.3.5.RELEASE) Если я хочу разделить 100 записей из БД на более мелкие фрагменты по 10 в каждом, которые будут обрабатываться одновременно в отдельных планировщиках, как я могу этого добиться? Метод buffer(int) возвращает List<Список<Объект>> и нужно ли мне использовать противодавление в этом случае? или если я заставлю свой @Scheduled запускаться с фиксированной задержкой?
2. Нет, функция не вызывается
doSucces
. Они обновили apidoOnNext
. Если у вас есть еще несколько вопросов, вам нужно начать новый вопрос. Но я могу сказать вам, почему вы хотите разместить его в 10 разных планировщиках? Прочитайте о reactor и его функциях, прежде чем задавать такие вопросы.flatMap
уже асинхронно, поэтому я не вижу причин, по которым unleass вы выполняете тяжелые математические вычисления, которые затем вы должны использовать parallell flux.3. Я предлагаю вам начать программировать и выполнять некоторые базовые вещи в reactor, прежде чем задавать все эти вопросы. Прочитайте начало работы с reactor в официальной документации. Там объясняется, как вы должны развиваться, чтобы воспользоваться его асинхронными возможностями, и он ответит на большинство ваших вопросов. Затем, когда у вас возникнут проблемы с разработкой, вы можете задать конкретные вопросы с кодом здесь.
4. В документации будет объяснено, что такое противодавление, когда его использовать и как вы его используете. Так что читайте документы.
Ответ №2:
Flux.buffer объединит потоки и выдаст список потоков указанного размера буфера. Для пакетной обработки вы можете использовать Flux.expand или Mono.expand. Вам нужно только указать свое условие в расширении, чтобы выполнить его снова или окончательно завершить. Вот несколько примеров:
public static void main(String[] args) {
List<String> list = new ArrayList<>();
list.add("1");
Flux.just(list)
.buffer(2)
.doOnNext(ls -> {
System.out.println(ls.getClass());
// Buffering a list returns the list of list of String
System.out.println(ls);
}).subscribe();
Flux.just(list)
.expand(listObj -> {
// Condition to finally end the batch
if (listObj.size()>4) {
return Flux.empty();
}
// Can return the size of data as much as you require
list.add("a");
return Flux.just(listObj);
}).map(ls -> {
// Here it returns list of String which was the original object type not list of list as in case of buffer
System.out.println(ls.getClass());
System.out.println(ls);
return ls;
}).subscribe();
}
Output:
class java.util.ArrayList
[[1]] /// Output of buffer list of list
class java.util.ArrayList
[1]
class java.util.ArrayList
[1, a]
class java.util.ArrayList
[1, a, a]
class java.util.ArrayList
[1, a, a, a]
class java.util.ArrayList
[1, a, a, a, a]
Комментарии:
1. Большое спасибо @Shubham1932, извините, что я поздно увидел ваш пост. Я попробую это. Спасибо