разветвляется и присоединяется с помощью Akka

#scala #parallel-processing #future #akka #actor

#scala #параллельная обработка #будущее #akka #актер

Вопрос:

постановка задачи: У меня есть портфель ценных бумаг, которые необходимо обрабатывать параллельно. В Java я использовал threadpool для обработки каждой защиты и использовал защелку для обратного отсчета. После завершения я делаю некоторое слияние и т.д.

Итак, я отправляю сообщение моему SecurityProcessor (который является действующим лицом) и жду завершения всех фьючерсов. В конце я использую MergeHelper для выполнения последующей обработки. Security Processor принимает безопасность, выполняет некоторый ввод-вывод и обработку и отвечает на безопасность

   val listOfFutures = new ListBuffer[Future[Security]]()
  var portfolioResponse: Portfolio = _
  for (security <- portfolio.getSecurities.toList) {
    val securityProcessor = actorOf[SecurityProcessor].start()
    listOfFutures  = (securityProcessor ? security) map {
      _.asInstanceOf[Security]
    }
  }
  val futures = Future.sequence(listOfFutures.toList)
  futures.map {
    listOfSecurities =>
      portfolioResponse = MergeHelper.merge(portfolio, listOfSecurities)
  }.get
  

Верен ли этот дизайн, и есть ли лучший / более крутой способ реализовать эту распространенную проблему с использованием akka?

Ответ №1:

 val futureResult = Future.sequence(
                  portfolio.getSecurities.toList map { security => (actorOf[SecurityProcessor].start() ? security).mapTo[Security] }
                ) map { securities => MergeHelper.merge(portfolio, securities) }
  

Комментарии:

1. Действительно понравилось это предложение и работает, как ожидалось, пока мне не пришлось разделить его и добавить кучу Eventhandler.info инструкции для отладки проблемы:(

2. def debug[T](t: T): T = { EventHandler.info (t); t }

3. Я очень рад, что вам это нравится, пожалуйста, поделитесь своими слезами радости и / или боли в рассылочном списке Akka!