#scala #concurrency #atomic #scala-cats #cats-effect
#scala #Параллелизм #атомарный #scala-cats #cats-эффект
Вопрос:
Проблема: я пытаюсь решить проблему, когда мне нужно планировать каждые x минут, мне нужно обновить кеш, и возможны одновременные получения.
Опробованные решения:
- Использование TrieMap и ScheduledThreadPool Executor с эффектами Cats:
На самом деле я начал с использования TrieMap, поскольку он обеспечивает потокобезопасность, и использовал запланированный пул потоков для планирования обновления
import cats.Applicative.ops.toAllApplicativeOps
import cats.effect.concurrent.Ref
import cats.effect.{ExitCode, IO, IOApp}
import java.util.concurrent.{Executors, ScheduledExecutorService}
import scala.collection.concurrent.TrieMap
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.util.Random
object ExploreTrieMap extends IOApp {
def callForEvery[A](f: => Unit, d: FiniteDuration)
(implicit sc: ScheduledExecutorService): IO[Unit] = {
IO.cancelable {
cb =>
val r = new Runnable {
override def run(): Unit = cb(Right(f))
}
val scFut = sc.scheduleAtFixedRate(r, 0, d.length, d.unit)
IO(scFut.cancel(false)).void
}
}
val map = TrieMap.empty[String, String]
override def run(args: List[String]): IO[ExitCode] = {
implicit val scheduler: ScheduledExecutorService = Executors.newScheduledThreadPool(1)
for {
_ <- callForEvery(println(map.get("token")), 1 second)
_ <- callForEvery(println(map.put("token", Random.nextString(10))), 3 second)
} yield ExitCode.Success
}
}
- Использование волокон с эффектом Ref и Cats:
А затем создал чистое решение с эффектом cats.
Приведет ли приведенный ниже код к ошибке StackOverflow?
import cats.effect.concurrent.Ref
import cats.effect.{ContextShift, ExitCode, Fiber, IO, IOApp}
import scala.concurrent.Future
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.util.Random
object ExploreCatFiber extends IOApp {
override def run(args: List[String]): IO[ExitCode] = {
for {
ref <- Ref.of[IO, String]("")
s <- scheduleAndPopulate(ref, 1 minute)
r <- keepPollingUsingFiber(ref)
_ <- s.join
_ <- r.join
} yield ExitCode.Success
}
def populate(): Future[String] = Future.successful(Random.nextString(10))
val futPop = IO.fromFuture(IO(populate()))
def scheduleAndPopulate(r: Ref[IO, String], duration: FiniteDuration)(implicit cs: ContextShift[IO]): IO[Fiber[IO, Unit]] = {
(for {
_ <- IO(println("Scheduled For Populating Ref"))
res <- futPop
_ <- r.set(res)
_ <- IO.sleep(duration)
rS <- scheduleAndPopulate(r, duration)(cs)
_ <- rS.join
} yield ()).start(cs)
}
def keepPollingUsingFiber(r: Ref[IO, String])(implicit cs: ContextShift[IO]): IO[Fiber[IO, Unit]] = {
(for {
res <- r.get
_ <- IO(println(res))
_ <- IO.sleep(1 second)
w <- keepPollingUsingFiber(r)(cs)
_ <- w.join
} yield ()).start(cs)
}
}
Я пытаюсь обновить ссылку и использовать ссылку как параллельный кеш, который обновляется другим волокном. И я запускаю создание fiber с помощью рекурсии. Я знаю, что волокна можно использовать для стекобезопасных операций. В этом случае я присоединяюсь к старому созданному волокну. Поэтому хотелось бы понять, безопасен ли приведенный ниже код.
Обновление (решение из ответов, приведенных ниже)
Третье решение: на основе ввода одного из ответов. Вместо того, чтобы разветвляться для каждого рекурсивного вызова, разветвляйте его на вызывающем.
import cats.effect.concurrent.Ref
import cats.effect.{ContextShift, ExitCode, Fiber, IO, IOApp}
import scala.concurrent.Future
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.util.Random
object ExploreCatFiberWithIO extends IOApp {
override def run(args: List[String]): IO[ExitCode] = {
for {
ref <- Ref.of[IO, String]("")
s <- scheduleAndPopulateWithIO(ref, 1 second).start
r <- keepPollingUsingIO(ref).start
_ <- s.join
_ <- r.join
} yield ExitCode.Success
}
def populate(): Future[String] = Future.successful(Random.nextString(10))
val futPop = IO.fromFuture(IO(populate()))
def scheduleAndPopulateWithIO(r: Ref[IO, String], duration: FiniteDuration)(implicit cs: ContextShift[IO]): IO[Unit] = {
for {
_ <- IO(println("Scheduled For Populating Ref"))
res <- futPop
_ <- r.set(res)
_ <- IO.sleep(duration)
_ <- scheduleAndPopulateWithIO(r, duration)(cs)
} yield ()
}
def keepPollingUsingIO(r: Ref[IO, String])(implicit cs: ContextShift[IO]): IO[Unit] = {
(for {
res <- r.get
_ <- IO(println(res))
_ <- IO.sleep(1 second)
w <- keepPollingUsingIO(r)(cs)
} yield ())
}
}
Хотелось бы узнать плюсы и минусы подходов, рассмотренных выше.
Ответ №1:
Для второго подхода вы можете упростить его, не разветвляя Fiber
на scheduleAndPopulate
и keepPollingUsingFiber
. Вместо этого сохраните рекурсивный вызов и передайте их вызывающему. IO
безопасен для стека, поэтому рекурсивный вызов не взорвет стек.
start
Вы могли бы использовать для каждого из них, но для них это может быть проще. parTupled
Это вариант parMapN
, который разветвляет каждый эффект и собирает их результаты.
(Кроме того, в вашем коде вам не нужно передавать неявные значения, например cs
, явно, компилятор выведет их для вас.)
object ExploreCatFiber extends IOApp {
override def run(args: List[String]): IO[ExitCode] = {
for {
ref <- Ref.of[IO, String]("")
_ <- (scheduleAndPopulate(ref, 1 minute), keepPollingUsingFiber(ref)).parTupled
} yield ExitCode.Success
}
def populate(): Future[String] = Future.successful(Random.nextString(10))
val futPop = IO.fromFuture(IO(populate()))
def scheduleAndPopulate(r: Ref[IO, String], duration: FiniteDuration): IO[Unit] = {
(for {
_ <- IO(println("Scheduled For Populating Ref"))
res <- futPop
_ <- r.set(res)
_ <- IO.sleep(duration)
_ <- scheduleAndPopulate(r, duration)
} yield ()
}
def keepPollingUsingFiber(r: Ref[IO, String]): IO[Unit] = {
(for {
res <- r.get
_ <- IO(println(res))
_ <- IO.sleep(1 second)
_ <- keepPollingUsingFiber(r)
} yield ()
}
}
Комментарии:
1. Спасибо за объяснение. Не могли бы вы, пожалуйста, объяснить плюсы и минусы обоих вышеперечисленных подходов, заданных в вопросе?
2. Адам, я также обновил вопрос с помощью предложенного вами решения.