#scala #asynchronous #akka #future #polling
#scala #асинхронный #akka #будущее #опрос
Вопрос:
Я хочу опрашивать конечную точку API, пока она не достигнет некоторого состояния. Я ожидаю, что он достигнет этого состояния через пару секунд до минуты. У меня есть метод для вызова конечной точки, которая возвращает a Future
. Есть ли какой-нибудь способ объединить Future
s вместе, чтобы опрашивать эту конечную точку каждые n
миллисекунды и сдаваться после t
попыток?
Предположим, у меня есть функция со следующей подписью:
def isComplete(): Future[Boolean] = ???
Самым простым способом сделать это, на мой взгляд, было бы сделать все блокирующим:
def untilComplete(): Unit = {
for { _ <- 0 to 10 } {
val status = Await.result(isComplete(), 1.seconds)
if (status) return Unit
Thread.sleep(100)
}
throw new Error("Max attempts")
}
Но это может занимать все потоки, и это не асинхронно. Я также рассматривал возможность рекурсивного выполнения:
def untilComplete(
f: Future[Boolean] = Future.successful(false),
attempts: Int = 10
): Future[Unit] = f flatMap { status =>
if (status) Future.successful(Unit)
else if (attempts == 0) throw new Error("Max attempts")
else {
Thread.sleep(100)
untilComplete(isComplete(), attempts - 1)
}
}
Тем не менее, я обеспокоен максимальным использованием стека вызовов, поскольку это не является хвостовой рекурсией.
Есть ли лучший способ сделать это?
Редактировать: я использую akka
Комментарии:
1. Для того, что вам нужно, вы можете рассмотреть возможность использования планировщика Akka.
Ответ №1:
Вы могли бы использовать потоки Akka. Например, вызывать isComplete
каждые 500 миллисекунд, пока результат Future
не станет истинным, максимум пять раз:
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{ Sink, Source }
import scala.concurrent.Future
import scala.concurrent.duration._
def isComplete(): Future[Boolean] = ???
implicit val system = ActorSystem("MyExample")
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher
val stream: Future[Option[Boolean]] =
Source(1 to 5)
.throttle(1, 500 millis)
.mapAsync(parallelism = 1)(_ => isComplete())
.takeWhile(_ == false, true)
.runWith(Sink.lastOption)
stream onComplete { result =>
println(s"Stream completed with result: $result")
system.terminate()
}
Комментарии:
1. Это отличный ответ. Мне действительно нужны некоторые данные,
isComplete
которые не представлены в моей упрощенной реализации, и я получил эти данные, используяinclusive = true
takeWhile
иSink.lastOption
. Оставляя это сообщение, чтобы другие тоже могли это сделать.2. Это не гарантирует, что мы не попробуем дальше, если получим успешный результат, прежде чем сделать все 5 попыток, не так ли?
Ответ №2:
На самом деле это вообще не рекурсивно, поэтому со стеком все будет в порядке.
Одно из улучшений вашего подхода, о котором я могу подумать, — это использовать какой-то планировщик вместо Thread.sleep
того, чтобы не задерживать поток.
В этом примере используются стандартные java TimerTask
, но если вы используете какой-то фреймворк, такой как akka, play или что-то еще, у него, вероятно, есть свой собственный планировщик, который был бы лучшей альтернативой.
object Scheduler {
val timer = new Timer(true)
def after[T](d: Duration)(f :=> Future[T]): Future[T] = {
val promise = Promise[T]()
timer.schedule(TimerTask { def run() = promise.completeWith(f) }, d.toMillis)
promise.future
}
}
def untilComplete(attempts: Int = 10) = isComplete().flatMap {
case true => Future.successful(())
case false if attempts > 1 => Scheduler.after(100 millis)(untilComplete(attempts-1))
case _ => throw new Exception("Attempts exhausted.")
}
Комментарии:
1. Спасибо за ответ. Я использую akka. Будет
after
ли из akka. шаблон вместо вашейScheduler.after
работы в этом примере?2. Конечно, это идеально
3. Кроме того, как это не рекурсивно?
untilComplete
вызывает себя4. (
untilComplete
проситScheduler
вызвать себя в другомThread
, с другим стеком вызовов.)5. @user1943992 что сказал Стив 🙂 На самом деле он не вызывает себя, он создает анонимную функцию, вызывающую саму себя, которую затем передает в качестве аргумента
Scheduler
, который в какой-то момент вызовет ее после завершения этого вызова. Но что более важно, все это происходит внутри анонимной функции, переданной.flatMap
, которая сама будет вызвана после завершения первого вызова. По этой причине ваш исходный фрагмент также не является рекурсивным, даже если он не используетScheduler
Ответ №3:
Я предоставил себе библиотеку для этого. У меня есть
trait Poller extends AutoCloseable {
def addTask[T]( task : Poller.Task[T] ) : Future[T]
def close() : Unit
}
где a Poller.Task
выглядит как
class Task[T]( val label : String, val period : Duration, val pollFor : () => Option[T], val timeout : Duration = Duration.Inf )
Poller
Опрос выполняется каждый period
раз, пока pollFor
метод не завершится успешно (не даст Some[T]
результат) или timeout
не будет превышен.
Для удобства, когда я начинаю опрос, я оборачиваю это в Poller.Task.withDeadline
:
final case class withDeadline[T] ( task : Task[T], deadline : Long ) {
def timedOut = deadline >= 0 amp;amp; System.currentTimeMillis > deadline
}
который преобразует (неизменяемый, повторно используемый) timeout
Продолжительность задачи до крайнего срока для каждой попытки опроса для тайм-аута.
Чтобы эффективно выполнять опрос, я использую Java ScheduledExecutorService
:
def addTask[T]( task : Poller.Task[T] ) : Future[T] = {
val promise = Promise[T]()
scheduleTask( Poller.Task.withDeadline( task ), promise )
promise.future
}
private def scheduleTask[T]( twd : Poller.Task.withDeadline[T], promise : Promise[T] ) : Unit = {
if ( isClosed ) {
promise.failure( new Poller.ClosedException( this ) )
} else {
val task = twd.task
val deadline = twd.deadline
val runnable = new Runnable {
def run() : Unit = {
try {
if ( ! twd.timedOut ) {
task.pollFor() match {
case Some( value ) => promise.success( value )
case None => Abstract.this.scheduleTask( twd, promise )
}
} else {
promise.failure( new Poller.TimeoutException( task.label, deadline ) )
}
}
catch {
case NonFatal( unexpected ) => promise.failure( unexpected )
}
}
}
val millis = task.period.toMillis
ses.schedule( runnable, millis, TimeUnit.MILLISECONDS )
}
}
Кажется, это работает хорошо, не требуя сна или блокировки отдельных Threads
пользователей.
(Глядя на библиотеку, можно было бы многое сделать, чтобы сделать ее более понятной, удобной для чтения, и роль Poller.Task.withDeadline
была бы прояснена путем создания необработанного конструктора для этого класса private
. Крайний срок всегда должен вычисляться на основе задачи timeout
, он не должен быть произвольной свободной переменной.)
Этот код взят отсюда (фреймворк и признак) и здесь (реализация). (Если вы хотите использовать его напрямую, координаты maven находятся здесь.)