Как обнаружить разрыв определенной продолжительности в потоке Akka?

#akka-stream

#akka-stream

Вопрос:

Я не думаю, что в настоящее время (Akka Stream 2.5.21) это возможно, и меня интересуют простые обходные пути, или люди, видящие это, могут быть частью самой библиотеки Akka Stream.

Мой текущий обходной путь:

 /*
* Implement a flow that does the 'action' when there is a gap of minimum 'duration' in the stream.
*/
def onGap[T](duration: FiniteDuration, action: => Unit): Flow[T,T,NotUsed] = {
  Flow[T]
    .map(Some(_))
    .keepAlive(duration, () => { action; None })
    .collect{ case Some(x) => x }
}
  

То, что я хотел бы видеть, похоже на .keepAlive , но запускается только один раз за разрыв, а не вводит запись в поток.

Другие подходы, которые я рассмотрел:

  • .idleTimeout(duration) с помощью Supervision.Decider , но для этого потребуется создать отдельный ActorSystem .
  • GraphStage но это всегда кажется сложным..

Мой вариант использования для этого — проанализировать (предположить), что потребление потока Kinesis достигло «текущего состояния» (были просмотрены исторические значения).

Был бы лучший способ?

Комментарии:

1. Я не знаю, что вы имеете в виду под igniting, но если вы хотите избежать введения записи в свой поток, вы могли бы использовать alsoTo , например .alsoTo(Flow.apply.keepAlive(1.second, () => "alert").filter(_ == "alert").to(Sink.foreach(/* your igniting logic here */)))

2. В итоге я понял, что в любом случае запуск чего-либо из потока был неправильным. Сообщение о том, что оно «готово», будет быстро перематываться через поток и не будет полезным. Таким образом, я действительно хочу, чтобы токен был введен в поток (но только один раз), когда поток показывает первые признаки бездействия. Ваш совет по ‘.alsoTo’ хорош — я должен ознакомиться с ним. 🙂