ZStream игнорирует параллельную операцию и выполняет ее последовательно вместо этого

#scala #zio

#scala #zio

Вопрос:

Следующий код должен выполнять putStrLn эффект параллельно из-за mapMPar :

 val runtime = zio.Runtime.default
val foo = ZIO.sleep(5.second) *> ZIO("foo")
val bar = ZIO("bar")

val k = ZStream.fromEffect(foo)    ZStream.fromEffect(bar)
val r = k.mapMPar(3)(x => console.putStrLn(s"Processing `${x}`"))

runtime.unsafeRun(r.runDrain)
  

Но на самом деле это всегда происходит foo раньше bar , несмотря ни на что. Я что-то пропустил или это ошибка?

Ответ №1:

Я думаю, что ваш пример просто не делает того, что вы ожидаете от него. fromEffect создает поток, который в основном говорит «У меня есть эффект, который в конечном итоге сгенерирует один элемент», затем первый поток ожидает 5 секунд, прежде чем создать этот элемент. Из-за природы потока оператор or concat является ленивым, что означает, что он не может начать обработку, пока все элементы не будут использованы из первого потока (что не может произойти в течение 5 секунд). В результате ваш поток действительно выглядит следующим образом:

 --5s--(foo)(bar)|
  

вместо того, что, я полагаю, вы думаете, что это должно понравиться:

 (bar)--5s--(foo)|
  

Возможно, лучший способ подумать об этом заключается в том, что для большей части потока у вас есть однополосная магистраль, одновременно может перемещаться только один элемент, а все последующие элементы блокируются элементами в начале строки. Как только вы нажмете на этот Par блок, вы открываете доступ к нескольким полосам движения, что означает, что более быстрые движущиеся объекты потенциально могут обогнать.

Таким образом, я могу добиться желаемого поведения, выполнив вместо этого что-то вроде этого:

 val k = ZStream("foo", "bar")
val r = k.mapMPar(3)(x => putStrLn(s"$x:enter") *> (ZIO.sleep(5.second) *> putStrLn(s"Processing `${x}`")) <* putStrLn(s"$x:exit"))

r.runDrain
  

Или написан немного более компактно:

 ZStream("foo", "bar").mapMPar(3)(x => for {
  _ <- putStrLn(s"$x:enter")
  _ <- ZIO.sleep(5.seconds) *> putStrLn(s"Processing `$x`")
  _ <- putStrLn(s"$x:exit")
} yield ()).runDrain
  

Комментарии:

1. Это имеет смысл, но не похоже, что это должно работать в параллельном режиме. Потому что, допустим, у меня сложный код, и я решил использовать mapMPar в каком-то методе. В этом методе я на самом деле не знаю, как был создан этот поток, поэтому я не могу доверять, что mapMPar он выполнит свою работу. Более того, такое поведение означает, что я не могу объединить потоки, если я хочу их распараллелить, что также звучит странно для меня.

2. Это будет работать в параллельном режиме, проблема в том, где вы ожидаете, что параллелизм сработает. Вставка задержки для первого эффекта фактически не генерирует параллельное поведение, потому что вы задерживаете весь поток, поэтому оба элемента по-прежнему отправляются в очередь параллельной работы в том же порядке без асинхронной границы между ними, что означает, что они все равно будут выводиться в том же порядке.

3. «что я не могу объединить потоки, если я хочу распараллелить», Звучит так, как будто вы действительно хотите merge потоки, а не объединять их. И снова проблема заключается в том, как отправляются элементы, одновременно в рабочую очередь может быть отправлено только одно значение (другими словами, сериализованная обработка), просто это mapNPar позволяет обрабатывать несколько элементов параллельно внутри оператора