#java #reactive-programming #project-reactor
#java #реактивное программирование #проект-реактор
Вопрос:
Я изучаю реактивное программирование с помощью project-reactor.
У меня есть следующий тестовый пример:
@Test
public void createAFlux_just() {
Flux<String> fruitFlux = Flux.just("apple", "orange");
fruitFlux.subscribe(f -> {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(f);
});
System.out.println("hello main thread");
}
При выполнении теста кажется, что основной поток застрял на 5 секунд.
Я ожидал бы, что подписанный потребитель должен выполняться асинхронно в своем собственном потоке, то есть вызов subscribe должен немедленно возвращаться в основной поток и, следовательно hello main thread
, должен печататься мгновенно.
Ответ №1:
Основной поток застрял, потому что подписка происходит в main
потоке. Если вы хотите, чтобы он выполнялся асинхронно, вам нужно, чтобы подписка выполнялась в потоке, отличном от main
. Вы могли бы сделать это как:
@Test
public void createAFlux_just() {
Flux<String> fruitFlux = Flux.just("apple", "orange");
fruitFlux.subscribeOn(Schedulers.parallel()).subscribe(f -> {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(f);
});
System.out.println("hello main thread");
}
Примечание: я использовал пул parallel
потоков. Вы можете использовать любой пул, который вам нравится. Конвейеры реактора выполняются в вызывающем потоке по умолчанию (в отличие CompletableFuture<T>
от того, который ForkJoin
по умолчанию выполняется в пуле).
Ответ №2:
Такое поведение имело бы место, если бы у вас был наблюдаемый (поток), который был асинхронным. Вы решили использовать поток с двумя легко доступными значениями, используя метод just . Они были переданы объекту подписки сразу, поскольку они были немедленно доступны.
Ответ №3:
из spring.io документация
Операторы реактора потоковой модели обычно не зависят от параллелизма: они не навязывают конкретную потоковую модель и просто запускаются в потоке, в котором был вызван их метод onNext.
Абстракция планировщика В реакторе планировщик — это абстракция, которая дает пользователю контроль над потоками. Планировщик может порождать Worker, которые концептуально являются потоками, но не обязательно поддерживаются потоком (мы увидим пример этого позже). Планировщик также включает в себя понятие часов, тогда как Рабочий — это исключительно планирование задач.
итак, вы должны подписаться на другой поток по subscribeOn
методу, а Thread.sleep(5000)
поток планировщика будет спать. Вы можете увидеть больше примеров, подобных этому, в документации.
Flux.just("hello")
.doOnNext(v -> System.out.println("just " Thread.currentThread().getName()))
.publishOn(Scheduler.boundedElastic())
.doOnNext(v -> System.out.println("publish " Thread.currentThread().getName()))
.delayElements(Duration.ofMillis(500))
.subscribeOn(Schedulers.elastic())
.subscribe(v -> System.out.println(v " delayed " Thread.currentThread().getName()));