#akka-stream
#akka-stream
Вопрос:
Я не думаю, что в настоящее время (Akka Stream 2.5.21) это возможно, и меня интересуют простые обходные пути, или люди, видящие это, могут быть частью самой библиотеки Akka Stream.
Мой текущий обходной путь:
/*
* Implement a flow that does the 'action' when there is a gap of minimum 'duration' in the stream.
*/
def onGap[T](duration: FiniteDuration, action: => Unit): Flow[T,T,NotUsed] = {
Flow[T]
.map(Some(_))
.keepAlive(duration, () => { action; None })
.collect{ case Some(x) => x }
}
То, что я хотел бы видеть, похоже на .keepAlive
, но запускается только один раз за разрыв, а не вводит запись в поток.
Другие подходы, которые я рассмотрел:
.idleTimeout(duration)
с помощьюSupervision.Decider
, но для этого потребуется создать отдельныйActorSystem
.GraphStage
но это всегда кажется сложным..
Мой вариант использования для этого — проанализировать (предположить), что потребление потока Kinesis достигло «текущего состояния» (были просмотрены исторические значения).
Был бы лучший способ?
Комментарии:
1. Я не знаю, что вы имеете в виду под igniting, но если вы хотите избежать введения записи в свой поток, вы могли бы использовать
alsoTo
, например.alsoTo(Flow.apply.keepAlive(1.second, () => "alert").filter(_ == "alert").to(Sink.foreach(/* your igniting logic here */)))
2. В итоге я понял, что в любом случае запуск чего-либо из потока был неправильным. Сообщение о том, что оно «готово», будет быстро перематываться через поток и не будет полезным. Таким образом, я действительно хочу, чтобы токен был введен в поток (но только один раз), когда поток показывает первые признаки бездействия. Ваш совет по ‘.alsoTo’ хорош — я должен ознакомиться с ним. 🙂