Блок Mono.count на неопределенный срок

#java #project-reactor

#java #проект-реактор

Вопрос:

Я запустил это:

 Mono<Void> mono = Mono.empty();

System.out.println("mono.block: "   mono.block());
  

и он выдает:

 mono.block: null    
  

как и ожидалось. Другими словами, вызов block будет немедленно возвращен, если Mono он уже завершен.

Еще один пример, напоминающий сценарий реального мира. У меня есть поток источника, например:

 Flux<Integer> ints = Flux.range(0, 2);
  

Я создаю подключаемый поток, который я буду использовать для разрешения нескольких подписчиков:

 ConnectableFlux<Integer> publish = ints.publish();
  

Для этого примера предположим, что есть один реальный подписчик:

 publish
   .doOnComplete(() -> System.out.println("publish completed"))
   .subscribe();
  

и еще один подписчик, который просто производит подсчет элементов:

 Mono<Long> countMono = publish
   .doOnComplete(() -> System.out.println("countMono completed"))
   .count();

countMono.subscribe();
  

Я подключаю подключаемый поток и печатаю количество элементов:

 publish.connect();

System.out.println("block");

long count = countMono.block();

System.out.println("count: "   count);
  

Это печатает:

 publish completed
countMono completed
block
  

Другими словами, оба подписчика успешно подписываются и завершают, но затем countMono.block() блокируются на неопределенный срок.

Почему это так и как мне заставить это работать? Моя конечная цель — получить количество элементов.

Ответ №1:

Вы можете заставить это работать, используя autoConnect или refCount вместо ручного вызова connect() .

Например:

         Flux<Integer> ints = Flux.range(0, 2);
        Flux<Integer> publish = ints.publish()
                .autoConnect(2);  // new 
        publish
                .doOnComplete(() -> System.out.println("publish completed"))
                .subscribe();
        Mono<Long> countMono = publish
                .doOnComplete(() -> System.out.println("countMono completed"))
                .count();
        // countMono.subscribe();
        long count = countMono.block();
        System.out.println("count: "   count);
  

Почему ваш пример не работает?

Вот что, я думаю, происходит в вашем примере… но это основано на моих ограниченных знаниях, и я не уверен на 100%, что это правильно.

  1. .publish() превращает восходящий источник в горячий поток
  2. Затем вы подписываетесь дважды (но они еще не запускают поток, поскольку подключаемый поток еще не подключен к восходящему потоку)
  3. .connect() подписывается на восходящий поток и запускает поток
  4. Восходящий поток и две подписки, которые были зарегистрированы до connect() завершения (поскольку все это происходит в основном потоке)
  5. На этом этапе подключаемый поток больше не подключен к восходящему потоку, потому что восходящий поток завершен (в документах reactor подробно рассказывается о том, что происходит с подключаемым потоком, когда новые подписки поступают после завершения исходного источника, так что в этом я не уверен на 100%.)
  6. block() создает новую подписку.
  7. Но поскольку ConnectableFlux больше не подключен, данные не передаются
  8. Если бы вы снова вызвали connect() (из другого потока, поскольку основной поток заблокирован), данные снова поступили бы и block() завершились. Однако это будет новая последовательность (не исходная последовательность, которая была завершена на шаге 4)

Почему мой пример работает?

Создаются только две подписки (вместо 3 в вашем примере), одна из .subscribe() вызова и одна из .block() . Подключаемый поток автоматически подключается после 2 подписок, и, следовательно block() , подписка завершается. Обе подписки используют одну и ту же восходящую последовательность.

Комментарии:

1. Спасибо, это имеет смысл. Как бы я это сделал, если Mono находится в нижней части цепочки, т. Е. Не Является прямым подписчиком ConnectableFlux ? Или, другими словами, есть ли способ получить последний результат от a Mono блокирующим способом?

2. Я не совсем понимаю, чего именно вы хотите, но… Чтобы получить значения, полученные из предыдущих подписок, вы могли бы использовать .cache (чтобы будущие подписчики видели одно и то же значение в течение определенного периода времени). Или вы можете сохранить отправленные значения где-то за пределами потока (from doOnNext ), а затем просмотреть значение из сохраненного местоположения.

3. Выше у меня есть Mono<Long> countMono = publish.count() . Скажем, что у меня есть Flux<Integer> publish2 = publish.flatMap(e -> whatever) и тогда Mono<Long> countMono = publish2.count() . Т.е. Между и есть промежуточный поток (или более одного) publish countMono . Вышеизложенное приведет publish2 к автоматической подписке, но оно может завершиться до countMono.block() вызова, что приведет к той же проблеме, верно? cache будет ли кэшироваться на неопределенный срок, правильно? Может быть проблемой для длительных заданий.