Spring webflux / reactor использует @Scheduled для чтения базы данных и выполнения некоторых задач

#reactive-programming #spring-webflux #project-reactor

#реактивное программирование #spring-webflux #проект-реактор

Вопрос:

Я новичок в spring webflux, и мое текущее приложение spring boot использует планировщик (с пометкой @Scheduled) для чтения списка данных из БД, одновременного пакетного вызова rest api, а затем записи в поток событий. Я хочу добиться того же в Spring webflux.

  1. Должен ли я использовать @Scheduled или использовать schedulePeriodically из Webflux?
  2. Как я могу разбивать элементы из БД на меньшие наборы (скажем, 10 элементов) и одновременно вызывать rest api?
  3. В настоящее время приложение извлекает максимум 100 записей за один запуск планировщика, а затем обрабатывает их. Я планирую перейти на r2dbc, если я это сделаю, могу ли я ограничить поток данных, например, 100?

Спасибо

Ответ №1:

1. Должен ли я использовать @Scheduled или использовать schedulePeriodically из Webflux?

@Scheduled это аннотация, которая является частью запланированного пакета spring framework, в то время как schedulePeriodically — это функция, которая является частью reactor, поэтому вы не можете сравнивать их. Я не вижу никаких проблем в использовании аннотации, поскольку она является частью основного фреймворка.

2. Как я могу разбивать элементы из БД на меньшие наборы (скажем, 10 элементов) и одновременно вызывать rest api?

Используя Flux#buffer функции, которые будут выдавать список элементов, когда буфер будет заполнен.

 Flux.just("1", "2", "3", "4")
        .buffer(2)
        .doOnNext(list -> {
            System.out.println(list.size());
        }).subscribe()
 

Будет печатать 2 каждый раз.

3. В настоящее время приложение извлекает не более 100 записей за один запуск планировщика, а затем обрабатывает их. Я планирую перейти на r2dbc, если я это сделаю, могу ли я ограничить поток данных, например, 100?

Ну, вы можете, как было написано ранее, извлекать, а затем буферизировать ответы в списки по 100, затем вы можете поместить каждый список в свой собственный поток и снова выдавать элементы или обрабатывать каждый список из 100 элементов. Зависит от вас.

В сегменте буфера есть много функций, ознакомьтесь с ними.

Поток #буфер

Комментарии:

1. спасибо за ваш ответ на Erktumlare. Я беспокоюсь, что буфер поместит все в память, что может привести к ООМ. Как насчет методов ограничения? Приведенный выше пример кода не компилируется, поскольку нет такого метода, как doSuccess. (rector-core: 3.3.5.RELEASE) Если я хочу разделить 100 записей из БД на более мелкие фрагменты по 10 в каждом, которые будут обрабатываться одновременно в отдельных планировщиках, как я могу этого добиться? Метод buffer(int) возвращает List<Список<Объект>> и нужно ли мне использовать противодавление в этом случае? или если я заставлю свой @Scheduled запускаться с фиксированной задержкой?

2. Нет, функция не вызывается doSucces . Они обновили api doOnNext . Если у вас есть еще несколько вопросов, вам нужно начать новый вопрос. Но я могу сказать вам, почему вы хотите разместить его в 10 разных планировщиках? Прочитайте о reactor и его функциях, прежде чем задавать такие вопросы. flatMap уже асинхронно, поэтому я не вижу причин, по которым unleass вы выполняете тяжелые математические вычисления, которые затем вы должны использовать parallell flux.

3. Я предлагаю вам начать программировать и выполнять некоторые базовые вещи в reactor, прежде чем задавать все эти вопросы. Прочитайте начало работы с reactor в официальной документации. Там объясняется, как вы должны развиваться, чтобы воспользоваться его асинхронными возможностями, и он ответит на большинство ваших вопросов. Затем, когда у вас возникнут проблемы с разработкой, вы можете задать конкретные вопросы с кодом здесь.

4. В документации будет объяснено, что такое противодавление, когда его использовать и как вы его используете. Так что читайте документы.

Ответ №2:

Flux.buffer объединит потоки и выдаст список потоков указанного размера буфера. Для пакетной обработки вы можете использовать Flux.expand или Mono.expand. Вам нужно только указать свое условие в расширении, чтобы выполнить его снова или окончательно завершить. Вот несколько примеров:

     public static void main(String[] args) {
        List<String> list = new ArrayList<>();
        list.add("1");
        
        
        Flux.just(list)
        .buffer(2)
        .doOnNext(ls -> {
            System.out.println(ls.getClass());
            // Buffering a list returns the list of list of String
            System.out.println(ls);
        }).subscribe();
        
        Flux.just(list)
        .expand(listObj -> {
            // Condition to finally end the batch 
            if (listObj.size()>4) {
                return Flux.empty();
            }
            // Can return the size of data as much as you require
            list.add("a");
            return Flux.just(listObj);
        }).map(ls -> {
            // Here it returns list of String which was the original object type not list of list as in case of buffer
            System.out.println(ls.getClass());
            System.out.println(ls);
            return ls;
        }).subscribe();
    }


Output:

class java.util.ArrayList
[[1]]            /// Output of buffer list of list 
class java.util.ArrayList
[1]
class java.util.ArrayList
[1, a]
class java.util.ArrayList
[1, a, a]
class java.util.ArrayList
[1, a, a, a]
class java.util.ArrayList
[1, a, a, a, a]
 

Комментарии:

1. Большое спасибо @Shubham1932, извините, что я поздно увидел ваш пост. Я попробую это. Спасибо