Реактивное программирование (реактор): почему застрял основной поток?

#java #reactive-programming #project-reactor

#java #реактивное программирование #проект-реактор

Вопрос:

Я изучаю реактивное программирование с помощью project-reactor.

У меня есть следующий тестовый пример:

 @Test
public void createAFlux_just() {
    Flux<String> fruitFlux = Flux.just("apple", "orange");
    fruitFlux.subscribe(f -> {
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(f);
    });
    System.out.println("hello main thread");
}
  

При выполнении теста кажется, что основной поток застрял на 5 секунд.

Я ожидал бы, что подписанный потребитель должен выполняться асинхронно в своем собственном потоке, то есть вызов subscribe должен немедленно возвращаться в основной поток и, следовательно hello main thread , должен печататься мгновенно.

Ответ №1:

Основной поток застрял, потому что подписка происходит в main потоке. Если вы хотите, чтобы он выполнялся асинхронно, вам нужно, чтобы подписка выполнялась в потоке, отличном от main . Вы могли бы сделать это как:

  @Test
public void createAFlux_just() {
    Flux<String> fruitFlux = Flux.just("apple", "orange");
    fruitFlux.subscribeOn(Schedulers.parallel()).subscribe(f -> {
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(f);
    });
    System.out.println("hello main thread");
}
  

Примечание: я использовал пул parallel потоков. Вы можете использовать любой пул, который вам нравится. Конвейеры реактора выполняются в вызывающем потоке по умолчанию (в отличие CompletableFuture<T> от того, который ForkJoin по умолчанию выполняется в пуле).

Ответ №2:

Такое поведение имело бы место, если бы у вас был наблюдаемый (поток), который был асинхронным. Вы решили использовать поток с двумя легко доступными значениями, используя метод just . Они были переданы объекту подписки сразу, поскольку они были немедленно доступны.

Ответ №3:

из spring.io документация

Операторы реактора потоковой модели обычно не зависят от параллелизма: они не навязывают конкретную потоковую модель и просто запускаются в потоке, в котором был вызван их метод onNext.

Абстракция планировщика В реакторе планировщик — это абстракция, которая дает пользователю контроль над потоками. Планировщик может порождать Worker, которые концептуально являются потоками, но не обязательно поддерживаются потоком (мы увидим пример этого позже). Планировщик также включает в себя понятие часов, тогда как Рабочий — это исключительно планирование задач.

итак, вы должны подписаться на другой поток по subscribeOn методу, а Thread.sleep(5000) поток планировщика будет спать. Вы можете увидеть больше примеров, подобных этому, в документации.

 Flux.just("hello")
    .doOnNext(v -> System.out.println("just "   Thread.currentThread().getName()))
    .publishOn(Scheduler.boundedElastic())
    .doOnNext(v -> System.out.println("publish "   Thread.currentThread().getName()))
    .delayElements(Duration.ofMillis(500))
    .subscribeOn(Schedulers.elastic())
    .subscribe(v -> System.out.println(v   " delayed "   Thread.currentThread().getName()));