akka-поток потоков с отслеживанием состояния подпотока

#scala #akka #akka-stream

Вопрос:

Существует поток с отслеживанием состояния:

 val stream = Flow[Event].statefulMapConcat {
  () =>

    val state = ...

    {
      element =>
        // change the state
        element :: Nil
    }
}
 

и это часть потока

 Flow[Event]
  .groupBy(1000000, event => event.key2, allowClosedSubstreamRecreation = true)
  .via(stream)
  .mergeSubstreams

 

Есть ли какой-либо способ получить state вход для stream каждого подпотока (в этом примере для каждого ключа после groupBy)?
Я думаю, что это должно быть материализовано для каждого подпотока, но не знаю, как это сделать.

Ответ №1:

В этой настройке вы получаете состояние для каждого подпотока:

   val stream = Flow[Int].statefulMapConcat {
    () => {

      var state: List[Int] = Nil

      element => {
        state = element :: state
        List(state)
      }
    }
  }

  val groupByFlow =
  Flow[Int]
    .groupBy(1000000, identity, allowClosedSubstreamRecreation = true)
    .via(stream)
    .mergeSubstreams

  Source(List(1,1,2,3,3,3))
    .via(groupByFlow)
    .runForeach(i => println(i))
 

напечатает

 List(1)
List(1, 1)
List(3)
List(2)
List(3, 3)
List(3, 3, 3)