#scala #zio
#scala #zio
Вопрос:
Следующий код должен выполнять putStrLn
эффект параллельно из-за mapMPar
:
val runtime = zio.Runtime.default
val foo = ZIO.sleep(5.second) *> ZIO("foo")
val bar = ZIO("bar")
val k = ZStream.fromEffect(foo) ZStream.fromEffect(bar)
val r = k.mapMPar(3)(x => console.putStrLn(s"Processing `${x}`"))
runtime.unsafeRun(r.runDrain)
Но на самом деле это всегда происходит foo
раньше bar
, несмотря ни на что. Я что-то пропустил или это ошибка?
Ответ №1:
Я думаю, что ваш пример просто не делает того, что вы ожидаете от него. fromEffect
создает поток, который в основном говорит «У меня есть эффект, который в конечном итоге сгенерирует один элемент», затем первый поток ожидает 5 секунд, прежде чем создать этот элемент. Из-за природы потока оператор
or concat
является ленивым, что означает, что он не может начать обработку, пока все элементы не будут использованы из первого потока (что не может произойти в течение 5 секунд). В результате ваш поток действительно выглядит следующим образом:
--5s--(foo)(bar)|
вместо того, что, я полагаю, вы думаете, что это должно понравиться:
(bar)--5s--(foo)|
Возможно, лучший способ подумать об этом заключается в том, что для большей части потока у вас есть однополосная магистраль, одновременно может перемещаться только один элемент, а все последующие элементы блокируются элементами в начале строки. Как только вы нажмете на этот Par
блок, вы открываете доступ к нескольким полосам движения, что означает, что более быстрые движущиеся объекты потенциально могут обогнать.
Таким образом, я могу добиться желаемого поведения, выполнив вместо этого что-то вроде этого:
val k = ZStream("foo", "bar")
val r = k.mapMPar(3)(x => putStrLn(s"$x:enter") *> (ZIO.sleep(5.second) *> putStrLn(s"Processing `${x}`")) <* putStrLn(s"$x:exit"))
r.runDrain
Или написан немного более компактно:
ZStream("foo", "bar").mapMPar(3)(x => for {
_ <- putStrLn(s"$x:enter")
_ <- ZIO.sleep(5.seconds) *> putStrLn(s"Processing `$x`")
_ <- putStrLn(s"$x:exit")
} yield ()).runDrain
Комментарии:
1. Это имеет смысл, но не похоже, что это должно работать в параллельном режиме. Потому что, допустим, у меня сложный код, и я решил использовать
mapMPar
в каком-то методе. В этом методе я на самом деле не знаю, как был создан этот поток, поэтому я не могу доверять, чтоmapMPar
он выполнит свою работу. Более того, такое поведение означает, что я не могу объединить потоки, если я хочу их распараллелить, что также звучит странно для меня.2. Это будет работать в параллельном режиме, проблема в том, где вы ожидаете, что параллелизм сработает. Вставка задержки для первого эффекта фактически не генерирует параллельное поведение, потому что вы задерживаете весь поток, поэтому оба элемента по-прежнему отправляются в очередь параллельной работы в том же порядке без асинхронной границы между ними, что означает, что они все равно будут выводиться в том же порядке.
3. «что я не могу объединить потоки, если я хочу распараллелить», Звучит так, как будто вы действительно хотите
merge
потоки, а не объединять их. И снова проблема заключается в том, как отправляются элементы, одновременно в рабочую очередь может быть отправлено только одно значение (другими словами, сериализованная обработка), просто этоmapNPar
позволяет обрабатывать несколько элементов параллельно внутри оператора