parMapN, который завершается, даже когда одна программа обнаруживает ошибку

#scala #scala-cats #io-monad

#scala #scala-cats #io-монада

Вопрос:

Используя parMapN , несколько IO программ могут выполняться параллельно, например:

 import cats.implicits._
import cats.effect.{ContextShift, IO}
import scala.concurrent.ExecutionContext

implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.global)

val ioA = IO(for(i <- 1 to 100) { println(s"A$i"); Thread.sleep(100) })
val ioB = IO(for(i <- 1 to 100) { println(s"B$i"); Thread.sleep(100) })
val ioC = IO(for(i <- 1 to 100) { println(s"C$i"); Thread.sleep(100) })

val program = (ioA, ioB, ioC).parMapN { (_, _, _) => () }

program.unsafeRunSync()
  

Пример вывода:

 A1
C1
B1
A2
C2
B2
A3
C3
B3
A4
C4
B4
A5
B5
C5
A6
B6
C6
A7
B7
C7
A8
...
  

Согласно документации, незавершенные задачи отменяются, если какая-либо из IO задач завершается сбоем. Каков наилучший способ изменить этот механизм, чтобы все IO программы все равно завершались?

В моем случае некоторые из IO программ ничего не возвращают ( IO[Unit] ), и я все еще хочу убедиться, что все выполняется до тех пор, пока не будет завершено или не возникнет ошибка.

Ответ №1:

Ну, я нашел один возможный ответ вскоре после публикации моего вопроса. Не уверен, что это лучший способ справиться с этим, но определение my IO s таким образом работает для меня:

 val ioA = IO(for(i <- 1 to 100) { println(s"A$i"); Thread.sleep(100) }).attempt
val ioB = IO(for(i <- 1 to 100) { println(s"B$i"); Thread.sleep(100) }).attempt
val ioC = IO(for(i <- 1 to 100) { println(s"C$i"); Thread.sleep(100) }).attempt
  

Ответ №2:

Насколько я заметил, в вашем примере кода нет ничего, вызывающего ошибку. Итак, у вас должен быть код, подобный приведенному ниже, чтобы иметь возможность видеть эту функцию:

 val ioA = IO(for(i <- 1 to 100) { println(s"A$i"); Thread.sleep(100) })
val ioB = IO.raiseError[Unit](new Exception("boom"))
val ioC = IO(for(i <- 1 to 100) { println(s"C$i"); Thread.sleep(100) })
  

Также это выглядит не очень хорошо, поскольку attempt функция изменит внутреннюю структуру на IO[Either[_,_]] , что не соответствует вашим намерениям, не так ли?