Эффективное обновление Ref / MVar

#scala #concurrency #functional-programming #scala-cats

#scala #параллелизм #функциональное программирование #scala-кошки

Вопрос:

Я хотел бы применить эффективное вычисление к значению внутри MVar or Ref и атомарно обновить его в случае успешного выполнения вычисления или вернуть начальное значение (в случае MVar ) / просто ничего не делать (в случае Ref ) в случае сбоя операции.

I. Ref-case

 val ref = Ref.of[IO, Int](0)

def foo(i: Int): IO[Int] = //... some effectual computation
  

Поскольку атомарность имеет значение и Ref , к сожалению, не обеспечивает compareAndSet работу, поэтому она должна быть реализована явно, что не выглядит привлекательным.

II. MVar-случай

MVar обеспечивает семантику взаимоисключения, но проблема в том, что bracket это не позволяет нам put вычислять значение. Вот пример:

 val mvar = MVar.of[IO, Int](0)

def foo(i: Int): IO[Int] = IO(i   1)

for {
  mvar <- mvar
  i <- mvar.take.bracket(foo)(mvar.put) //puts back 0, not 1
} yield ()
  

Есть ли способ реализовать такое поведение хотя бы для любого MVar или Ref ?

UPD:

Я реализовал это с MVar помощью, но это выглядит довольно уродливо:

 def updateAtomically(mvar: MVar[IO, Int], foo: Int => IO[Int]): IO[Int] = for {
  i <- mvar.take
  ii <- foo(i).onError{
    case t => mvar.put(i)
  }
  _ <- mvar.put(ii)
} yield ii
  

Комментарии:

1. что не так с for (i <- mvar.take; ii <- foo(i); _ <- mvar.put(ii)) yield ii , без onError ?

2. @Martijn Это оставит mvar пустым, вызывая взаимоблокировку.

3. «Уродливый» субъективен. «Довольно» имеет мало практической ценности. Вы можете сделать, например mvar.take >>= (i => foo(i).attempt.map(_.fold(_ => i, ii => ii)).flatTap(mvar.put)) , но это автоматически не улучшит код.

4. @MateuszKubuszok Ну, IMO ugly препятствует удобочитаемости, поэтому его приукрашивание может иметь некоторое значение…

Ответ №1:

Вы можете использовать F[B],bind:A=>F[B]):F[B]» rel=»nofollow noreferrer»> MonadError.redeemWith для этого:

 def updateAtomically(mvar: MVar[IO, Int], foo: Int => IO[Int]): IO[Int] =
  for {
    i  <- mvar.take
    ii <- foo(0).redeemWith(_ => IO(i), ii => mvar.put(ii) *> IO(ii))
  } yield ii
  

И затем:

 import cats.Applicative.ops.toAllApplicativeOps
import cats.effect.{ ExitCode, IO, IOApp }
import cats.effect.concurrent.MVar

object Foo extends IOApp {

  def foo(i: Int): IO[Int] = IO(i   1)
  def fooBar(i: Int): IO[Int] = IO.raiseError(new RuntimeException("BOOM"))

  def run(args: List[String]): IO[ExitCode] =
    (for {
      mvar <- MVar.of[IO, Int](0)
      res  <- updateAtomically(mvar, foo)
      _    <- IO(println(res))
    } yield res).map(_ => ExitCode.Success)
}
  

Выдает:

 1
  

И:

 def run(args: List[String]): IO[ExitCode] =
  (for {
     mvar <- MVar.of[IO, Int](0)
     res  <- updateAtomically(mvar, fooBar)
     _    <- IO(println(res))
   } yield res).map(_ => ExitCode.Success)
  

Выдает:

 0
  

Комментарии:

1. Выглядит намного приятнее, спасибо. Я думаю, что необходимо реализовать восстановление, mvar.put(i) *> IO(i) чтобы избежать пустого MVar.

2. @SomeName — единственное место, где обновляется MVar внутренняя updateAtomically часть? Если да, то да.