#scala #zio
#scala #zio
Вопрос:
(Надеюсь) простой вопрос по Scalaz Zio.
У меня есть некоторый старый код, который я переработал для Zio. Я хочу, чтобы один путь этого кода продолжал вести себя точно так, как это было:
- синхронный
- блокировка
- в текущем потоке (это жесткое требование)
Как я могу запустить IO
такой, чтобы он вел себя как старый блокирующий код?
В настоящее время я использую:
private lazy val blockingRts = new RTS {}
def runBlocking[E, A](io: IO[E, A]): Either[E, A] = {
blockingRts.unsafeRun(io.attempt)
}
Кажется, это помогает, но я далеко не уверен, что это правильно. На 100% ли это обратно совместимо со старым кодом?
Комментарии:
1. Особенно требование запустить его в текущем потоке звучит для меня очень сомнительно. Это похоже на фрагмент кода, который — с функциональной точки зрения — абсолютно не принадлежит
IO
. Но это просто мое внутреннее ощущение.2. Это в стороне — может быть, синхронизация — это то, что вы ищете?
Ответ №1:
Хорошо, я, наконец, заглянул под капот и реализовал кое-что, что, похоже, удовлетворяет моим требованиям:
/**
* Executes the IO synchronous and blocking on the current thread, thus running an IO
* without any of the advantages of IO. This can be useful for maintaining backwards compatibility.
* Rethrows any exception that was not handled by the IO's error handling.
*/
@throws
def runLegacy[E, A](io: IO[E, A]): Either[E, A] = {
syncBlockingRunTimeSystem.unsafeRunSync[Nothing, Either[E, A]](io.either) match {
case Exit.Success(v) => v
case Exit.Failure(Cause.Die(exception)) => throw exception
case Exit.Failure(Cause.Interrupt) => throw new InterruptedException
case Exit.Failure(fail) => throw FiberFailure(fail)
}
}
private lazy val syncBlockingRunTimeSystem = Runtime(
(),
PlatformLive.fromExecutor(new Executor {
override def yieldOpCount: Int = Int.MaxValue
override def metrics: Option[ExecutionMetrics] = None
override def submit(runnable: Runnable): Boolean = {
runnable.run()
true
}
override def here: Boolean = true
})
)
Я также написал пару тестов:
"runLegacy" should {
"run synchronous code in blocking fashion on current thread" in {
var runCount = 0
val io = IO.succeedLazy { runCount = 1 }
.map { _ => runCount =1 }
.flatMap { _ =>
runCount = 1
IO.effect {
runCount = 1
Thread.currentThread()
}
}
runCount shouldBe 0
runLegacy(io) shouldBe Right(Thread.currentThread())
runCount shouldBe 4
}
"run parallel code sequentially on current thread" in {
val ios = (1 to 500).map { i => IO.succeedLazy { i } }
runLegacy(IO.reduceAll(IO.succeed(0), ios) {
case (a, b) => a b
}) shouldBe Right((500 * 501) / 2)
}
"run many flatMaps without overflowing" in {
var runCount = 0
val io = IO.succeedLazy { runCount = 1 }
val manyIo = (1 to 9999).foldLeft(io) { case (acc, _) => acc.flatMap { _ => io } }
runLegacy(manyIo)
runCount shouldBe 10000
}
case object TestException extends Throwable
"handle sync blocking errors" in {
case object TestException extends Throwable
runLegacy(IO.effect(throw TestException)) shouldBe Left(TestException)
}
"rethrow unhandled exceptions" in {
assertThrows[TestException.type] {
runLegacy(IO.succeedLazy(throw TestException))
}
}
}