#scala #concurrency #functional-programming #scala-cats
#scala #параллелизм #функциональное программирование #scala-кошки
Вопрос:
Я хотел бы применить эффективное вычисление к значению внутри MVar
or Ref
и атомарно обновить его в случае успешного выполнения вычисления или вернуть начальное значение (в случае MVar
) / просто ничего не делать (в случае Ref
) в случае сбоя операции.
I. Ref-case
val ref = Ref.of[IO, Int](0)
def foo(i: Int): IO[Int] = //... some effectual computation
Поскольку атомарность имеет значение и Ref
, к сожалению, не обеспечивает compareAndSet
работу, поэтому она должна быть реализована явно, что не выглядит привлекательным.
II. MVar-случай
MVar обеспечивает семантику взаимоисключения, но проблема в том, что bracket
это не позволяет нам put
вычислять значение. Вот пример:
val mvar = MVar.of[IO, Int](0)
def foo(i: Int): IO[Int] = IO(i 1)
for {
mvar <- mvar
i <- mvar.take.bracket(foo)(mvar.put) //puts back 0, not 1
} yield ()
Есть ли способ реализовать такое поведение хотя бы для любого MVar
или Ref
?
UPD:
Я реализовал это с MVar
помощью, но это выглядит довольно уродливо:
def updateAtomically(mvar: MVar[IO, Int], foo: Int => IO[Int]): IO[Int] = for {
i <- mvar.take
ii <- foo(i).onError{
case t => mvar.put(i)
}
_ <- mvar.put(ii)
} yield ii
Комментарии:
1. что не так с
for (i <- mvar.take; ii <- foo(i); _ <- mvar.put(ii)) yield ii
, безonError
?2. @Martijn Это оставит mvar пустым, вызывая взаимоблокировку.
3. «Уродливый» субъективен. «Довольно» имеет мало практической ценности. Вы можете сделать, например
mvar.take >>= (i => foo(i).attempt.map(_.fold(_ => i, ii => ii)).flatTap(mvar.put))
, но это автоматически не улучшит код.4. @MateuszKubuszok Ну, IMO ugly препятствует удобочитаемости, поэтому его приукрашивание может иметь некоторое значение…
Ответ №1:
Вы можете использовать F[B],bind:A=>F[B]):F[B]» rel=»nofollow noreferrer»> MonadError.redeemWith
для этого:
def updateAtomically(mvar: MVar[IO, Int], foo: Int => IO[Int]): IO[Int] =
for {
i <- mvar.take
ii <- foo(0).redeemWith(_ => IO(i), ii => mvar.put(ii) *> IO(ii))
} yield ii
И затем:
import cats.Applicative.ops.toAllApplicativeOps
import cats.effect.{ ExitCode, IO, IOApp }
import cats.effect.concurrent.MVar
object Foo extends IOApp {
def foo(i: Int): IO[Int] = IO(i 1)
def fooBar(i: Int): IO[Int] = IO.raiseError(new RuntimeException("BOOM"))
def run(args: List[String]): IO[ExitCode] =
(for {
mvar <- MVar.of[IO, Int](0)
res <- updateAtomically(mvar, foo)
_ <- IO(println(res))
} yield res).map(_ => ExitCode.Success)
}
Выдает:
1
И:
def run(args: List[String]): IO[ExitCode] =
(for {
mvar <- MVar.of[IO, Int](0)
res <- updateAtomically(mvar, fooBar)
_ <- IO(println(res))
} yield res).map(_ => ExitCode.Success)
Выдает:
0
Комментарии:
1. Выглядит намного приятнее, спасибо. Я думаю, что необходимо реализовать восстановление,
mvar.put(i) *> IO(i)
чтобы избежать пустого MVar.2. @SomeName — единственное место, где обновляется
MVar
внутренняяupdateAtomically
часть? Если да, то да.