#scala #akka #akka-stream
#scala #akka #akka-stream
Вопрос:
У меня есть функция, возвращающая a, Flow
логика которой включает передачу некоторых элементов графика вспомогательному приемнику, Sink
переданному в качестве параметра. Я хочу сохранить материализованное значение вспомогательного Sink
, чтобы я мог воздействовать на его значение при запуске сконструированного потока.
Вот приблизительная картина потока, который я создаю:
IN ~> (logic: In => Out) ~> Broadcast ~> AuxFilter ~> AuxSink
~> OutFilter ~> OUT
Пример кода:
case class Incoming()
trait Element
case class Outcoming() extends Element
case class Persistent() extends Element
def flow[Mat](auxSink: Sink[Persistent, Mat]): Flow[Incoming, Outcoming, NotUsed] = {
val isPersistent = Flow[Element].collect {
case persistent: Persistent => persistent
}
val isRunning = Flow[Element].collect {
case out: Outcoming => out
}
val magicFlow: Flow[Incoming, Element, NotUsed] = Flow[Incoming]
.map(_ => if (Random.nextBoolean()) Outcoming() else Persistent())
Flow.fromGraph {
GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val magic = b.add(magicFlow)
val bcast = b.add(Broadcast[Element](2))
val sink = b.add(isRunning)
bcast.out(0) ~> isPersistent ~> auxSink
magic.out ~> bcast.in
bcast.out(1) ~> isRunning ~> sink.in
FlowShape(magic.in, sink.out)
}
}
}
Есть ли способ каким-либо образом передать auxSink
‘s Mat
в результирующий Flow
?
Спасибо.
Ответ №1:
Отвечая на мой собственный вопрос…
Нашел это! Источник Flow.alsoToMat
указал мне именно на ту логику, которая мне была нужна — чтобы получить доступ к материализованному значению вспомогательного графика (в моем случае auxSink
), нужно импортировать его форму в строящийся график, передав ее в качестве параметра в GraphDSL.create()
.
def flow[Mat](auxSink: Sink[Persistent, Mat]): Flow[Incoming, Outcoming, Mat] = {
val isPersistent = ...
val isRunning = ...
val magicFlow = ...
Flow.fromGraph {
GraphDSL.create(auxSink) { implicit b => aux =>
import GraphDSL.Implicits._
val magic = b.add(magicFlow)
val bcast = b.add(Broadcast[Element](2))
val sink = b.add(isRunning)
magic ~> bcast ~> isPersistent ~> aux
bcast ~> isRunning ~> sink
FlowShape(magic.in, sink.out)
}
}
}