Как вызвать метод блокировки Synchronus внутри асинхронного метода в Java?

#java #reactive-programming #spring-webflux #project-reactor

#java #реактивное программирование #spring-webflux #проект-реактор

Вопрос:

Я использую библиотеку Project Reactor. Вот мой сценарий.

Я хочу вызвать службу блокировки внутри моего неблокирующего метода.

У меня есть три разные службы, я вызвал эти три службы из своего приложения springboot. Вот мой пример кода

 public Mono<Example> getValuesFromDifferentServices() {

  Mono<Example1> mono1=service.getService1();
  Mono<Example2> mono2=service.getService2();

  mono1.zipwith(mono2)
     .map(value-> {
         // some logics then
         if(value.getT1().getStatus().equals(value.getT2().getStatus())) {
           Mono<Example3> mono3 = service.getService3(true);
           mono3.map(f-> {
              value.getT1().setSomething(f.getSomething);
              return f;
           }).subscribe();
         }
       return value.getT1();
     })
}
 

Примечание: Приведенный выше пример не является фактической логикой. Но реализация аналогична этой

Даже я пытался subscribe() это сделать, я не мог все время получать значение 3rd service (значения неопределенности). Я не могу block() использовать 3-ю службу, поскольку она запрещена. Как этого добиться?

Обновление: ввод 3-й службы будет решаться после того, если условие должно быть истинным или нет Mono<Example3> mono3 = service.getService3(true); , мы должны вызвать 3-ю службу, если только условие соответствует, в противном случае вызов 3-й службы не требуется и не рекомендуется., Если условие не соответствует, мы не должны вызывать 3-ю службу.

Комментарии:

1. Тег mono не предназначен для Java.

Ответ №1:

Этот пример немного странный, но, как я понимаю, вы хотите вызвать первые две службы, каждая из которых вернет вам одно значение. После этого вы хотите вызвать третий, если необходимо, и установить значение из него в одно из первых полей.

В любом случае, есть простое решение, но с дополнительной информацией, возможно, мы сможем создать более приятный поток. Этот поток использует flatMap, который охотно подписывается на внутреннего издателя.

[Пример был написан на Kotlin, он очень похож на Java. Единственная путаница здесь, возможно it , переменная, которая равна чему-то вроде этого: map(it -> it.sg ) ]

 data class Example(
    val name: String,
    val status: String,
    var value: String? = null  
)

class ReactorTest {

    @Test
    fun test() {
        val first = Mono.just(Example("first", "suspended"))
        val second = Mono.just(Example("second", "suspended"))
        val third = Mono.just(Example("third", "suspended", "thirdValue"))

        val stream = first.zipWith(second)
            .flatMap { tuple ->
                Mono.just(tuple.t1)
                    .filter { it.status == tuple.t2.status }
                    .zipWith(third)
                    .doOnNext {
                        it.t1.value = it.t2.value
                    }
                    .map { it.t1 }
                    .switchIfEmpty(Mono.just(tuple.t1))
            }

            StepVerifier.create(stream)
                .expectNext(Example("first", "suspended", "thirdValue"))
                .verifyComplete()
    }

    @Test
    fun test2() {
        val first = Mono.just(Example("first", "suspended"))
        val second = Mono.just(Example("second", "active"))
        val third = Mono.just(Example("third", "suspended", "thirdValue"))

        val stream = first.zipWith(second)
            .flatMap { tuple ->
                Mono.just(tuple.t1)
                    .filter { it.status == tuple.t2.status }
                    .zipWith(third)
                    .doOnNext {
                        it.t1.value = it.t2.value
                    }
                    .map { it.t1 }
                    .switchIfEmpty(Mono.just(tuple.t1))
            }

        StepVerifier.create(stream)
            .expectNext(Example("first", "suspended"))
            .verifyComplete()
    }
}
 

Примечание: если вы используете службы блокировки в своих реактивных потоках, они должны быть разделены на выделенные потоки потоков. Нравится:

 fun blockingService(): Mono<String> {
    //real service use fromCallable
    return Mono.just("fromCallableOnServiceCall")
        //for real service it may use a dedicated pool
        .subscribeOn(Schedulers.boundedElastic())

}
 

Комментарии:

1. Я забыл обновить, что ввод третьей службы будет определяться внутри условия If. Обновлен вопрос

2. приведенный выше пример все еще действителен, вы можете использовать zipWith (третью) часть как zipWith(service.getService3(true))

3. Возможно ли отправить true значение входного значения из 1-й / 2-й службы? вместо жесткого кодирования, подобного service.getService3(true) . В основном это было бы service.getService3(mono1's value))

4. .zipWith(tuple.t1) / .zipWith(tuple.t2) .