Потоки Akka — Как сохранить материализованное значение вспомогательного приемника в графике

#scala #akka #akka-stream

#scala #akka #akka-stream

Вопрос:

У меня есть функция, возвращающая a, Flow логика которой включает передачу некоторых элементов графика вспомогательному приемнику, Sink переданному в качестве параметра. Я хочу сохранить материализованное значение вспомогательного Sink , чтобы я мог воздействовать на его значение при запуске сконструированного потока.

Вот приблизительная картина потока, который я создаю:

 IN ~> (logic: In => Out) ~> Broadcast ~> AuxFilter ~> AuxSink
                                      ~> OutFilter ~> OUT
  

Пример кода:

 case class Incoming()
trait Element
case class Outcoming() extends Element
case class Persistent() extends Element

def flow[Mat](auxSink: Sink[Persistent, Mat]): Flow[Incoming, Outcoming, NotUsed] = {
  val isPersistent = Flow[Element].collect {
    case persistent: Persistent => persistent
  }

  val isRunning = Flow[Element].collect {
    case out: Outcoming => out
  }

  val magicFlow: Flow[Incoming, Element, NotUsed] = Flow[Incoming]
    .map(_ => if (Random.nextBoolean()) Outcoming() else Persistent())

  Flow.fromGraph {
    GraphDSL.create() { implicit b =>
      import GraphDSL.Implicits._

      val magic = b.add(magicFlow)
      val bcast = b.add(Broadcast[Element](2))
      val sink = b.add(isRunning)

                   bcast.out(0) ~> isPersistent ~> auxSink
      magic.out ~> bcast.in
                   bcast.out(1) ~> isRunning ~> sink.in

      FlowShape(magic.in, sink.out)
    }
  }
}
  

Есть ли способ каким-либо образом передать auxSink ‘s Mat в результирующий Flow ?

Спасибо.

Ответ №1:

Отвечая на мой собственный вопрос…

Нашел это! Источник Flow.alsoToMat указал мне именно на ту логику, которая мне была нужна — чтобы получить доступ к материализованному значению вспомогательного графика (в моем случае auxSink ), нужно импортировать его форму в строящийся график, передав ее в качестве параметра в GraphDSL.create() .

 def flow[Mat](auxSink: Sink[Persistent, Mat]): Flow[Incoming, Outcoming, Mat] = {
  val isPersistent = ...
  val isRunning = ...
  val magicFlow = ...

  Flow.fromGraph {
    GraphDSL.create(auxSink) { implicit b => aux =>
      import GraphDSL.Implicits._

      val magic = b.add(magicFlow)
      val bcast = b.add(Broadcast[Element](2))
      val sink = b.add(isRunning)

      magic ~> bcast ~> isPersistent ~> aux
               bcast ~> isRunning ~> sink

      FlowShape(magic.in, sink.out)
    }
  }
}