Как проводить опрос с будущим в Scala?

#scala #asynchronous #akka #future #polling

#scala #асинхронный #akka #будущее #опрос

Вопрос:

Я хочу опрашивать конечную точку API, пока она не достигнет некоторого состояния. Я ожидаю, что он достигнет этого состояния через пару секунд до минуты. У меня есть метод для вызова конечной точки, которая возвращает a Future . Есть ли какой-нибудь способ объединить Future s вместе, чтобы опрашивать эту конечную точку каждые n миллисекунды и сдаваться после t попыток?

Предположим, у меня есть функция со следующей подписью:

 def isComplete(): Future[Boolean] = ???
  

Самым простым способом сделать это, на мой взгляд, было бы сделать все блокирующим:

 def untilComplete(): Unit = {
  for { _ <- 0 to 10 } {
    val status = Await.result(isComplete(), 1.seconds)
    if (status) return Unit
    Thread.sleep(100)
  }
  throw new Error("Max attempts")
}
  

Но это может занимать все потоки, и это не асинхронно. Я также рассматривал возможность рекурсивного выполнения:

 def untilComplete(
    f: Future[Boolean] = Future.successful(false),
    attempts: Int = 10
  ): Future[Unit] = f flatMap { status =>
    if (status) Future.successful(Unit)
    else if (attempts == 0) throw new Error("Max attempts")
    else {
      Thread.sleep(100)
      untilComplete(isComplete(), attempts - 1)
    }
}
  

Тем не менее, я обеспокоен максимальным использованием стека вызовов, поскольку это не является хвостовой рекурсией.

Есть ли лучший способ сделать это?

Редактировать: я использую akka

Комментарии:

1. Для того, что вам нужно, вы можете рассмотреть возможность использования планировщика Akka.

Ответ №1:

Вы могли бы использовать потоки Akka. Например, вызывать isComplete каждые 500 миллисекунд, пока результат Future не станет истинным, максимум пять раз:

 import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{ Sink, Source }
import scala.concurrent.Future
import scala.concurrent.duration._

def isComplete(): Future[Boolean] = ???

implicit val system = ActorSystem("MyExample")
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher

val stream: Future[Option[Boolean]] =
  Source(1 to 5)
    .throttle(1, 500 millis)
    .mapAsync(parallelism = 1)(_ => isComplete())
    .takeWhile(_ == false, true)
    .runWith(Sink.lastOption)

stream onComplete { result =>
  println(s"Stream completed with result: $result")
  system.terminate()
}
  

Комментарии:

1. Это отличный ответ. Мне действительно нужны некоторые данные, isComplete которые не представлены в моей упрощенной реализации, и я получил эти данные, используя inclusive = true takeWhile и Sink.lastOption . Оставляя это сообщение, чтобы другие тоже могли это сделать.

2. Это не гарантирует, что мы не попробуем дальше, если получим успешный результат, прежде чем сделать все 5 попыток, не так ли?

Ответ №2:

На самом деле это вообще не рекурсивно, поэтому со стеком все будет в порядке.

Одно из улучшений вашего подхода, о котором я могу подумать, — это использовать какой-то планировщик вместо Thread.sleep того, чтобы не задерживать поток.

В этом примере используются стандартные java TimerTask , но если вы используете какой-то фреймворк, такой как akka, play или что-то еще, у него, вероятно, есть свой собственный планировщик, который был бы лучшей альтернативой.

 object Scheduler {
   val timer = new Timer(true)
   def after[T](d: Duration)(f :=> Future[T]): Future[T] = {
     val promise = Promise[T]()
     timer.schedule(TimerTask { def run() = promise.completeWith(f) }, d.toMillis)
     promise.future
   }
}


def untilComplete(attempts: Int = 10) = isComplete().flatMap { 
   case true => Future.successful(())
   case false if attempts > 1 => Scheduler.after(100 millis)(untilComplete(attempts-1))
   case _ => throw new Exception("Attempts exhausted.") 
}
  

Комментарии:

1. Спасибо за ответ. Я использую akka. Будет after ли из akka. шаблон вместо вашей Scheduler.after работы в этом примере?

2. Конечно, это идеально

3. Кроме того, как это не рекурсивно? untilComplete вызывает себя

4. ( untilComplete просит Scheduler вызвать себя в другом Thread , с другим стеком вызовов.)

5. @user1943992 что сказал Стив 🙂 На самом деле он не вызывает себя, он создает анонимную функцию, вызывающую саму себя, которую затем передает в качестве аргумента Scheduler , который в какой-то момент вызовет ее после завершения этого вызова. Но что более важно, все это происходит внутри анонимной функции, переданной .flatMap , которая сама будет вызвана после завершения первого вызова. По этой причине ваш исходный фрагмент также не является рекурсивным, даже если он не использует Scheduler

Ответ №3:

Я предоставил себе библиотеку для этого. У меня есть

 trait Poller extends AutoCloseable {
  def addTask[T]( task : Poller.Task[T] ) : Future[T]
  def close() : Unit
}
  

где a Poller.Task выглядит как

 class Task[T]( val label : String, val period : Duration, val pollFor : () => Option[T], val timeout : Duration = Duration.Inf )
  

Poller Опрос выполняется каждый period раз, пока pollFor метод не завершится успешно (не даст Some[T] результат) или timeout не будет превышен.

Для удобства, когда я начинаю опрос, я оборачиваю это в Poller.Task.withDeadline :

 final case class withDeadline[T] ( task : Task[T], deadline : Long ) {
  def timedOut = deadline >= 0 amp;amp; System.currentTimeMillis > deadline
}
  

который преобразует (неизменяемый, повторно используемый) timeout Продолжительность задачи до крайнего срока для каждой попытки опроса для тайм-аута.

Чтобы эффективно выполнять опрос, я использую Java ScheduledExecutorService :

 def addTask[T]( task : Poller.Task[T] ) : Future[T] = {
  val promise = Promise[T]()
  scheduleTask( Poller.Task.withDeadline( task ), promise )
  promise.future
}

private def scheduleTask[T]( twd : Poller.Task.withDeadline[T], promise : Promise[T] ) : Unit = {
  if ( isClosed ) { 
    promise.failure( new Poller.ClosedException( this ) )
  } else {
    val task     = twd.task
    val deadline = twd.deadline

    val runnable = new Runnable {

      def run() : Unit = {
        try {
          if ( ! twd.timedOut ) {
            task.pollFor() match {
              case Some( value ) => promise.success( value )
              case None          => Abstract.this.scheduleTask( twd, promise )
            }
          } else {
            promise.failure( new Poller.TimeoutException( task.label, deadline ) )
          }
        }
        catch {
          case NonFatal( unexpected ) => promise.failure( unexpected )
        }
      }
    }

    val millis = task.period.toMillis
    ses.schedule( runnable, millis, TimeUnit.MILLISECONDS )
  }
}
  

Кажется, это работает хорошо, не требуя сна или блокировки отдельных Threads пользователей.

(Глядя на библиотеку, можно было бы многое сделать, чтобы сделать ее более понятной, удобной для чтения, и роль Poller.Task.withDeadline была бы прояснена путем создания необработанного конструктора для этого класса private . Крайний срок всегда должен вычисляться на основе задачи timeout , он не должен быть произвольной свободной переменной.)

Этот код взят отсюда (фреймворк и признак) и здесь (реализация). (Если вы хотите использовать его напрямую, координаты maven находятся здесь.)