Как использовать содержимое Mono в Reactor

#reactive-programming #spring-webflux #project-reactor

#реактивное программирование #spring-webflux #проект-реактор

Вопрос:

Я пытаюсь вызвать реактивный REST API для извлечения переменной deadlinesTS. Затем я пытаюсь установить то же самое в своем классе Pojo. Но значение после установки сроков в BOLCompliance не согласуется с использованием subscribe() . Иногда я могу установить значение, в других случаях я получаю null. Как я могу убедиться, что каждый раз, когда я могу установить значение.

 Mono<String> deadlineTS = portCallServiceCaller.getDeadlineTSByComplianceId(compliance.getId());

BOLCompliance complianceResponse = new BOLCompliance();
deadlineTS.subscribe(val->complianceResponse.setDeadlineTimestamp(val));
 

Комментарии:

1. @lkatiforis: пожалуйста, ответьте

Ответ №1:

Использование содержимого Mono вне реактивного конвейера (блокировка)

Вы можете использовать block() метод, подобный этому:

 Mono<String> nameMono = Mono.just("some-value").delayElement(Duration.ofMillis(300));

Person person = new Person();
person.setName(nameMono.block());

System.out.println(person.getName());
 

Это запускает операцию и ожидает ее завершения. Обратите внимание, что вызывающий поток блокируется.

В качестве альтернативы вы можете использовать subscribe(consumer, errorConsumer, completeConsumer) и предоставить a Runnable , который будет запущен после завершения операции:

 valueMono.subscribe(v-> person.setName(v), throwable -> {}, () -> System.out.println(person.getName()));
 

Однако subscribe() метод немедленно вернется.

Используйте содержимое Mono в реактивном конвейере

Вы можете выбрать один из предоставленных операторов в зависимости от конкретного случая.

В этом случае вы можете использовать map operator для преобразования String в BOLCompliance :

   Mono<BOLCompliance> fetchBOLCompliance() {
    Mono<String> deadlineMono = portCallServiceCaller.getDeadlineTSByComplianceId(compliance.getId();

    return deadlineMono.map(deadline -> {
      BOLCompliance compliance = new BOLCompliance();
      compliance.setDeadlineTimestamp(deadline);
      return compliance;
    });
  }
 

Если вы хотите запустить асинхронную задачу (например, доступ к базе данных), вам нужно будет использовать flatmap operator .

subscribe() возвращает немедленно

Согласно Javadoc:

Disposable subscribe(Consumer<? super T> consumer)

Имейте в виду, что, поскольку последовательность может быть асинхронной, это немедленно вернет управление вызывающему потоку. Это может создать впечатление, что потребитель не вызывается при выполнении, например, в основном потоке или модульном тестировании.

Другими словами, метод subscribe завершает работу и немедленно возвращается. Таким образом, вы не получаете гарантии, что операция выполнена. Например, следующий пример всегда будет иметь нулевое значение:

 Mono<String> nameMono = Mono.just("some-value").delayElement(Duration.ofMillis(300));

Person person = new Person();
nameMono.subscribe(v-> person.setName(v));

System.out.println(person.getName());
 

Здесь person.getName() метод вызывается немедленно, а person.setName(v) вызывается через 300 миллисекунд.

Комментарии:

1. Если я использую block() , я получаю эту ошибку — block() / blockFirst() / blockLast() блокируются, что не поддерживается в thread reactor, как теперь исправить?

2. Внесено редактирование. Вы никогда не должны использовать block() его в реактивном конвейере.

3. @MiraDevi Это работает для вас? Если он отвечает на ваш вопрос, пожалуйста, подумайте о том, чтобы проголосовать за ответ и принять его как правильный, чтобы другие могли извлечь из него пользу и легко понять, какое может быть возможным решением для подобных вопросов.

4. Да, это так, извините, что не проголосовал за это, поскольку я думал, что человек, задавший вопрос, его голос не учитывается. Большое спасибо за решение.