Написание императивного кода в Project Reactor

#spring #spring-webflux #project-reactor #spring-mono

Вопрос:

У меня есть класс со следующими двумя методами.

 public class Test1 {
    public Mono<String> blah1() {
        Mono<String> blah = Mono.just("blah1");
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        System.out.println("blah1 done");
        return blah;
    }
    
    public Mono<String> blah2() {
        Mono<String> blah = Mono.just("blah2");
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        System.out.println("blah2 done");
        return blah;
    }
}
 

У меня есть следующий джунит:

 @Test
public void blah1Test() {
    Flux<Tuple2<String, String>> s = Flux.zip(test1.blah1(), test1.blah2());    
}
 

Мой результат выглядит следующим образом:

 blah1 done
blah2 done
 

Я ожидаю, что бла-2 закончится до бла-1. Таким образом, я считаю, что это блокировка обработки, а не неблокирующая. Что мне нужно сделать, чтобы переключить выход на blah2, а затем на blah1? В принципе, почему они не обрабатываются параллельно?

Заранее спасибо за потраченное время!

Ответ №1:

Оба sleep и println выполняются вне реактивного конвейера. Поэтому оба blah1() и blah2() ведут себя как обычные, нереактивные методы.

Попробуй это:

 public Mono<String> blah1() {
    System.out.println("blah1 start");
    return Mono.just("blah1")
        .delayElement(Duration.ofMillis(5000))
        .doOnNext(e -> System.out.println("blah1 done"));
}

public Mono<String> blah2() {
    System.out.println("blah2 start");
    return Mono.just("blah2")
        .delayElement(Duration.ofMillis(1000))
        .doOnNext(e -> System.out.println("blah2 done"));
}
 

Здесь мы имеем ожидаемый результат, потому что печать выполняется в реактивном конвейере.

Выход:

 blah1  start 
blah2 start 
blah2 done
blah1 done
 

Комментарии:

1. Я пытаюсь медленно перенести большое приложение в spring reactive с течением времени. Есть ли способ сделать так, чтобы я мог обрабатывать методы, код которых находится вне реактивного конвейера, все еще используя моно/поток? У меня есть много завершаемых функций, но я бы предпочел пока изменить их на Mono/flux, а код внутри метода изменить позже.

2. Возможно, я ответил на свой собственный вопрос. Похоже, я могу ввести код . doOnNext, и он будет обрабатываться в реактивном конвейере. Это точно?

3. @Брайан, конечно doOnNext , будет работать, но не для всех случаев. Не рекомендуется изменять состояние приложения или выполнять там асинхронную операцию.

4. Большая часть моего приложения асинхронна и уже использует завершаемые функции, но не для всего. Спасибо за всю большую помощь!