#scala #akka #actor #akka-http #akka-actor
#scala #akka #актер #akka-http #akka-actor
Вопрос:
Я пытаюсь разделить большой кусок текста на несколько абзацев и обрабатывать его одновременно, вызывая внешний API. Неизменяемый список обновляется каждый раз, когда приходит ответ от API для абзаца.
После обработки абзацев и обновления списка я хотел бы попросить Актера указать окончательный статус, который будет использоваться на следующих шагах.
Проблема с нижеприведенным подходом заключается в том, что я никогда не узнаю, когда обрабатываются все абзацы. Мне нужно вернуть targetStore после обработки всех абзацев и получения окончательного списка.
def main(args: Array[String]) {
val source = Source.fromFile("input.txt")
val extDelegator = new ExtractionDelegator()
source.getLines().foreach(line => extDelegator.processParagraph(line))
extDelegator.getFinalResult()
}
case class Extract(uuid: UUID, text: String)
case class UpdateList(text: String)
case class DelegateLambda(text: String)
case class FinalResult()
class ExtractionDelegator {
val system = ActorSystem("ExtractionDelegator")
val extActor = system.actorOf(Props(classOf[ExtractorDelegateActor]).withDispatcher("fixed-thread-pool"))
implicit val executionContext = system.dispatchers.lookup("fixed-thread-pool")
def processParagraph(text: String) = {
extActor ! Extract(uuid, text)
}
def getFinalResult(): java.util.List[String] = {
implicit val timeout = Timeout(5 seconds)
val askActor = system.actorOf(Props(classOf[ExtractorDelegateActor]))
val future = askActor ? FinalResult()
val result = Await.result(future, timeout.duration).asInstanceOf[java.util.List[String]]
result
}
def shutdown(): Unit = {
system.terminate()
}
}
/* Extractor Delegator actor*/
class ExtractorDelegateActor extends Actor with ActorLogging {
var targetStore:scala.collection.immutable.List[String] = scala.collection.immutable.List.empty
def receive = {
case Extract(uuid, text) => {
context.actorOf(Props[ExtractProcessor].withDispatcher("fixed-thread-pool")) ! DelegateLambda(text)
}
case UpdateList(res) => {
targetStore = targetStore : res
}
case FinalResult() => {
val senderActor=sender()
senderActor ! targetStore
}
}
}
/* Aggregator actor*/
class ExtractProcessor extends Actor with ActorLogging {
def receive = {
case DelegateLambda(text) => {
val res =callLamdaService(text)
sender ! UpdateList(res)
}
}
def callLamdaService(text: String): String = {
//THis is where external API is called.
Thread.sleep(1000)
result
}
}
Ответ №1:
Не уверен, почему вы хотите использовать актеров здесь, самым простым было бы
// because you call external service, you have back async response most probably
def callLamdaService(text: String): Future[String]
и для обработки вашего текста вы делаете
implicit val ec = scala.concurrent.ExecutionContext.Implicits.global // use you execution context here
Future.sequence(source.getLines().map(callLamdaService)).map {results =>
// do what you want with results
}
Если вы все еще хотите использовать актеров, вы можете сделать это, заменив callLamdaService
processParagraph
то, что внутренне будет сделано ask
для рабочего актера, который возвращает результат (таким образом, подпись для processParagraph
будет def processParagraph(text: String): Future[String]
)
Если вы все еще хотите запустить несколько задач, а затем запросить результат, то вам просто нужно использовать context.become
with receive(worker: Int)
, когда вы увеличиваете количество рабочих для каждого Extract
сообщения и уменьшаете количество рабочих для каждого UpdateList
сообщения. Вам также необходимо будет реализовать затем отложенную обработку FinalResult
для случая ненулевого количества обрабатывающих работников.