#java #project-reactor
#java #проект-реактор
Вопрос:
Я запустил это:
Mono<Void> mono = Mono.empty();
System.out.println("mono.block: " mono.block());
и он выдает:
mono.block: null
как и ожидалось. Другими словами, вызов block
будет немедленно возвращен, если Mono
он уже завершен.
Еще один пример, напоминающий сценарий реального мира. У меня есть поток источника, например:
Flux<Integer> ints = Flux.range(0, 2);
Я создаю подключаемый поток, который я буду использовать для разрешения нескольких подписчиков:
ConnectableFlux<Integer> publish = ints.publish();
Для этого примера предположим, что есть один реальный подписчик:
publish
.doOnComplete(() -> System.out.println("publish completed"))
.subscribe();
и еще один подписчик, который просто производит подсчет элементов:
Mono<Long> countMono = publish
.doOnComplete(() -> System.out.println("countMono completed"))
.count();
countMono.subscribe();
Я подключаю подключаемый поток и печатаю количество элементов:
publish.connect();
System.out.println("block");
long count = countMono.block();
System.out.println("count: " count);
Это печатает:
publish completed
countMono completed
block
Другими словами, оба подписчика успешно подписываются и завершают, но затем countMono.block()
блокируются на неопределенный срок.
Почему это так и как мне заставить это работать? Моя конечная цель — получить количество элементов.
Ответ №1:
Вы можете заставить это работать, используя autoConnect
или refCount
вместо ручного вызова connect()
.
Например:
Flux<Integer> ints = Flux.range(0, 2);
Flux<Integer> publish = ints.publish()
.autoConnect(2); // new
publish
.doOnComplete(() -> System.out.println("publish completed"))
.subscribe();
Mono<Long> countMono = publish
.doOnComplete(() -> System.out.println("countMono completed"))
.count();
// countMono.subscribe();
long count = countMono.block();
System.out.println("count: " count);
Почему ваш пример не работает?
Вот что, я думаю, происходит в вашем примере… но это основано на моих ограниченных знаниях, и я не уверен на 100%, что это правильно.
.publish()
превращает восходящий источник в горячий поток- Затем вы подписываетесь дважды (но они еще не запускают поток, поскольку подключаемый поток еще не подключен к восходящему потоку)
.connect()
подписывается на восходящий поток и запускает поток- Восходящий поток и две подписки, которые были зарегистрированы до
connect()
завершения (поскольку все это происходит в основном потоке) - На этом этапе подключаемый поток больше не подключен к восходящему потоку, потому что восходящий поток завершен (в документах reactor подробно рассказывается о том, что происходит с подключаемым потоком, когда новые подписки поступают после завершения исходного источника, так что в этом я не уверен на 100%.)
block()
создает новую подписку.- Но поскольку ConnectableFlux больше не подключен, данные не передаются
- Если бы вы снова вызвали
connect()
(из другого потока, поскольку основной поток заблокирован), данные снова поступили бы иblock()
завершились. Однако это будет новая последовательность (не исходная последовательность, которая была завершена на шаге 4)
Почему мой пример работает?
Создаются только две подписки (вместо 3 в вашем примере), одна из .subscribe()
вызова и одна из .block()
. Подключаемый поток автоматически подключается после 2 подписок, и, следовательно block()
, подписка завершается. Обе подписки используют одну и ту же восходящую последовательность.
Комментарии:
1. Спасибо, это имеет смысл. Как бы я это сделал, если
Mono
находится в нижней части цепочки, т. Е. Не Является прямым подписчикомConnectableFlux
? Или, другими словами, есть ли способ получить последний результат от aMono
блокирующим способом?2. Я не совсем понимаю, чего именно вы хотите, но… Чтобы получить значения, полученные из предыдущих подписок, вы могли бы использовать
.cache
(чтобы будущие подписчики видели одно и то же значение в течение определенного периода времени). Или вы можете сохранить отправленные значения где-то за пределами потока (fromdoOnNext
), а затем просмотреть значение из сохраненного местоположения.3. Выше у меня есть
Mono<Long> countMono = publish.count()
. Скажем, что у меня естьFlux<Integer> publish2 = publish.flatMap(e -> whatever)
и тогдаMono<Long> countMono = publish2.count()
. Т.е. Между и есть промежуточный поток (или более одного)publish
countMono
. Вышеизложенное приведетpublish2
к автоматической подписке, но оно может завершиться доcountMono.block()
вызова, что приведет к той же проблеме, верно?cache
будет ли кэшироваться на неопределенный срок, правильно? Может быть проблемой для длительных заданий.