Как выполнить асинхронно внутри стрелки понимание монады реактора

#kotlin #project-reactor #arrow-kt

#kotlin #проект-реактор #стрелка-kt

Вопрос:

В следующем фрагменте кода каждый helloX() метод выполняется асинхронно (это отложенный Mono, который выполняется в отдельном потоке), см. Полный код ниже):

     override fun helloEverybody(): Kind<ForMonoK, String> {
        return MonoK.monad().fx.monad {
            val j = !helloJoey()
            val j2 = !helloJohn()
            val j3 = !helloMary()
            "$j and $j2 and $j3"
        }.fix()
    }
  

Однако в журналах я вижу, что они выполняются секуционально:

 14:10:46.983 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
14:10:47.084 [elastic-2] INFO com.codependent.kotlinarrow.service.HelloServiceImpl - helloJoey()
14:10:49.087 [elastic-2] INFO com.codependent.kotlinarrow.service.HelloServiceImpl - helloJoey() - ready
14:10:49.090 [elastic-3] INFO com.codependent.kotlinarrow.service.HelloServiceImpl - helloJohn()
14:10:54.091 [elastic-3] INFO com.codependent.kotlinarrow.service.HelloServiceImpl - helloJohn() - ready
14:10:54.092 [elastic-2] INFO com.codependent.kotlinarrow.service.HelloServiceImpl - helloMary()
14:10:59.095 [elastic-2] INFO com.codependent.kotlinarrow.service.HelloServiceImpl - helloMary() - ready
hello Joey and hello John and hello Mary
  

Как я мог бы заставить их выполняться параллельно и агрегировать все результаты в понимании монады после того, как все они будут завершены?

Полный код с основным методом ():

 class HelloServiceImpl : HelloService<ForMonoK> {

    private val logger = LoggerFactory.getLogger(javaClass)

    override fun helloEverybody(): Kind<ForMonoK, String> {
        return MonoK.monad().fx.monad {
            val j = !helloJoey()
            val j2 = !helloJohn()
            val j3 = !helloMary()
            "$j and $j2 and $j3"
        }.fix()
    }

    override fun helloJoey(): Kind<ForMonoK, String> {
        return Mono.defer {
            logger.info("helloJoey()")
            sleep(2000)
            logger.info("helloJoey() - ready")
            Mono.just("hello Joey")
        }.subscribeOn(Schedulers.elastic()).k()
    }

    override fun helloJohn(): Kind<ForMonoK, String> {
        return Mono.defer {
            logger.info("helloJohn()")
            sleep(5000)
            logger.info("helloJohn() - ready")
            Mono.just("hello John")
        }.subscribeOn(Schedulers.elastic()).k()
    }

    override fun helloMary(): Kind<ForMonoK, String> {
        return Mono.defer {
            logger.info("helloMary()")
            sleep(5000)
            logger.info("helloMary() - ready")
            Mono.just("hello Mary")
        }.subscribeOn(Schedulers.elastic()).k()
    }

}

fun main() {
    val countDownLatch = CountDownLatch(1)
    HelloServiceImpl().helloEverybody().fix().mono.subscribe {
        println(it)
        countDownLatch.countDown()
    }
    countDownLatch.await()
}
  

Обновить

Я адаптировал метод для объединения последовательной операции с параллельной:

     override fun helloEverybody(): Kind<ForMonoK, String> {
        return MonoK.async().fx.async {
            val j = helloJoey().bind()
            val j2= Dispatchers.IO
                    .parMapN(helloJohn(), helloMary()){ it1, it2 -> "$it1 and $it2" }
            "$j and $j2"
        }
    }
  

К сожалению, parMapN нельзя использовать с ForMonoK:

 Type inference failed: fun <A, B, C, D> CoroutineContext.parMapN(fa: Kind<ForIO, A>, fb: Kind<ForIO, B>, fc: Kind<ForIO, C>, f: (A, B, C) -> D): IO<D>
cannot be applied to
receiver: CoroutineDispatcher  arguments: (Kind<ForMonoK, String>,Kind<ForMonoK, String>,Kind<ForMonoK, String>,(String, String, String) -> String)
  

Идеи?

Ответ №1:

flatMap то же, map что и , не имеет семантики потоков или параллелизма. То, что вам нужно, называется parMap and parTraverse , которое выполняется несколько MonoK параллельно.

В этот момент fx блок становится ненужным, поскольку он предназначен для последовательных операций. Вы можете смешивать и сочетать оба.

 MonoK.async().fx.async {

  val result = 
    Dispatchers.IO
     .parMap(helloJoey(), helloMary()) { joe, mary -> ... }
     .bind()

  otherThing(result).bind()

}
  

Комментарии:

1. Я пробовал смешивать как последовательные, так и параллельные операции. Однако parMapN ожидает ForIO вместо ForMonoK … ??? Смотрите Обновленный вопрос

2. Вы правы, что extfun может быть определен только для ввода-вывода. Это то, что вам нужно: github.com/arrow-kt/arrow-fx/blob/master/arrow-fx/src/main/… Я полагаю, что у нас нет параллельных экземпляров для Reactor, поэтому может случиться так, что вам нужно будет использовать операторы Reactor для параллелизма.