#scala #akka #akka-stream
Вопрос:
Существует поток с отслеживанием состояния:
val stream = Flow[Event].statefulMapConcat {
() =>
val state = ...
{
element =>
// change the state
element :: Nil
}
}
и это часть потока
Flow[Event]
.groupBy(1000000, event => event.key2, allowClosedSubstreamRecreation = true)
.via(stream)
.mergeSubstreams
Есть ли какой-либо способ получить state
вход для stream
каждого подпотока (в этом примере для каждого ключа после groupBy)?
Я думаю, что это должно быть материализовано для каждого подпотока, но не знаю, как это сделать.
Ответ №1:
В этой настройке вы получаете состояние для каждого подпотока:
val stream = Flow[Int].statefulMapConcat {
() => {
var state: List[Int] = Nil
element => {
state = element :: state
List(state)
}
}
}
val groupByFlow =
Flow[Int]
.groupBy(1000000, identity, allowClosedSubstreamRecreation = true)
.via(stream)
.mergeSubstreams
Source(List(1,1,2,3,3,3))
.via(groupByFlow)
.runForeach(i => println(i))
напечатает
List(1)
List(1, 1)
List(3)
List(2)
List(3, 3)
List(3, 3, 3)