#scala #timer #actor
#scala #таймер #актер
Вопрос:
Мне нужен актер для отправки сообщения каждую минуту. Как мне лучше всего добиться такого поведения? Я боюсь использовать java.lang.Thread.sleep(long millis)
, поскольку поток может быть общим для многих участников в Scala, насколько я понимаю.
Ответ №1:
Или, как упоминал @Daniel, вот работающий пример:
import scala.actors._
import scala.actors.Actor._
class TimerActor(val timeout: Long,val who: Actor,val reply: Any) extends Actor {
def act {
loop {
reactWithin(timeout) {
case TIMEOUT => who ! reply
}
}
}
}
val a = actor {
loop {
react {
case x => println(x)
}
}
}
val t = new TimerActor(1000, a, "Go for it")
a.start
t.start
Комментарии:
1. Спасибо. Я принял ответ Дэниела, поскольку он был первым, но я очень ценю ваши усилия по его иллюстрации.
2. Код можно использовать и для одноразового таймера, но вам придется удалить
loop
, иначе ваши ответы начнут накапливаться.3. в этом примере
reactWithin(timeout)
ожидает только тайм-аута. если я также добавлюcase
для определенного сообщения, и будет доступно множество сообщений, будет ли сгенерировано событие ТАЙМ-АУТА? когда я запускаю тест, я не вижу никакого ТАЙМ-аута. как актер может получать сообщения, а также самостоятельно генерировать события ТАЙМЕРА для себя через регулярные промежутки времени? и я хочу, чтобы сообщения ТАЙМЕРА обрабатывались с приоритетом. поэтому я не могу нанять другого актера ТАЙМЕРА и отправлять события таймера моему актеру, так как они будут добавлены в хвост почтового ящика. Есть ли способ добавить сообщение ТАЙМЕРА в заголовок почтового ящика?4.@weima Вы должны использовать
TimerActor
только дляTIMEOUT
и не получать другие сообщения. Создайте новогоTimerActor
внутри вашего обычного актера сwho
= внешним обычным актером и позвольте обычному актеру обрабатыватьreply
сообщение каждыеtimeout
миллисекунды.5. спасибо @PeterSchmitz, я создал TimerActor в моем актере, но я получаю исключение «ошибка утверждения: получение с канала, принадлежащего другому актеру»
Ответ №2:
Создайте актер с receiveWithin
, который будет действовать как таймер.
Ответ №3:
Вы можете использовать Akka FSM для моделирования субъекта, который остается forMax
на миллисекунду в состоянии ожидания, а затем отправляет сообщение, например, переключаясь в другое состояние во время использования onTransition
и оставаясь там в течение 0 миллисекунд, чтобы вернуться в состояние ожидания. На странице akka есть хороший пример.
Комментарии:
1. Спасибо, но я бы хотел избежать Akka для этого проекта, я готов реализовать это с помощью обычных актеров Scala.
Ответ №4:
import scala.actors._
class Wakeup[A](millis: Int, who: ReplyReactor, alarm: A) extends Thread {
val done = new java.util.concurrent.atomic.AtomicBoolean(false)
override def run {
while (!done.get()) {
who ! alarm
Thread.sleep(millis)
}
}
}
case object BEEP {}
val a = new ReplyReactor { def act { loop { react {
case BEEP => println("Wha?! " new java.util.Date)
case _ =>
}}}}
val b = new Wakeup(60000,a,BEEP)
a.start
Зачем использовать актер, когда вам нужен поток?
scala> b.start
scala> Wha?! Mon Nov 07 18:43:18 EST 2011
Wha?! Mon Nov 07 18:44:18 EST 2011
Wha?! Mon Nov 07 18:45:18 EST 2011
Wha?! Mon Nov 07 18:46:18 EST 2011
Wha?! Mon Nov 07 18:47:18 EST 2011
Wha?! Mon Nov 07 18:48:18 EST 2011
Wha?! Mon Nov 07 18:49:18 EST 2011
Wha?! Mon Nov 07 18:50:18 EST 2011
Wha?! Mon Nov 07 18:51:18 EST 2011
Wha?! Mon Nov 07 18:52:18 EST 2011
Комментарии:
1. Разве это не плохо — отправлять сообщение актеру от не-актера? Существует вопрос с ответом, объясняющим, как это обычно бывает.
2. @Ivan — Это может быть нормально, если актер знает, что он не может ответить. Получать сообщения более рискованно, чем отправлять их.
3. Потому что актеры совместно используют потоки. Если вы приостанавливаете поток, вы приостанавливаете всех актеров, работающих в нем
4. @KrzysztofWende — Какое это имеет отношение к моему ответу или вопросу Ивана?
5. @RexKerr Явное использование потоков в программировании актеров крайне не рекомендуется и считается антишаблоном. Кроме того, использование потока для простого ожидания чего-либо действительно плохо сказывается на производительности.
Ответ №5:
Я закончил созданием выделенного работоспособного экземпляра, который продолжает отправлять сообщение целевому субъекту. Нравится
case class QueueTick()
class QueueWatcherActor extends Actor {
override def receive = {
case QueueTick() => // do it here
}
}
val ref = ActorSystem("xxx")
val actor = ref.actorOf(Props[QueueWatcherActor])
val executor = Executors.newSingleThreadScheduledExecutor()
executor.scheduleAtFixedRate(new Runnable {
def run() {
actor ! QueueTick()
}
},1,60,TimeUnit.SECONDS)
Ответ №6:
Поскольку scala.actors теперь устарел и заменяется на akka actors (и поскольку в akka actors нет react или receiveWithin), вот как это сделать, используя akka actors (на самом деле это меньше «взлома», чем использование receiveWithin в любом случае, ИМХО).
В приведенном ниже примере запланирован запуск runnable через 5 секунд:
import akka.actor.{ActorSystem, Scheduler}
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.ExecutionContext.Implicits.global
class TimerExample {
def example() = {
def scheduler: Scheduler = ActorSystem.create("timer-example").scheduler
val myRunnable = new Runnable {
override def run(): Unit = {
println("run invoked")
}
}
println("scheduling...")
scheduler.scheduleOnce(FiniteDuration(5,TimeUnit.SECONDS),myRunnable)
Thread.sleep(6000)
println("should have printed 'run invoked'")
}