Akka BalancingPool: нет группового варианта?

#scala #akka #load-balancing

#scala #akka #балансировка нагрузки

Вопрос:

Я пытаюсь реализовать балансировку нагрузки для набора актеров Akka. Согласно документации, BalancingPool предлагает поведение, которое меня интересует (кража работы). По какой-то причине нет «группового варианта», который позволил бы мне самому создавать маршруты и передавать их маршрутизатору. В документах явно указано

Для BalancingPool нет группового варианта.

не говоря почему. Конструктор моего актера вызывается с аргументами, которые вычисляются во время выполнения, поэтому у меня нет другого выбора, кроме как создавать их программно.

 val resources:List[Any] = // ...
val system = ActorSystem("MySystem")
val routees = resources.map(r => system.actorOf(MyActor.props(r))
// This doesn't work for me, because every actor needs a resource!
val router = system.actorOf(BalancingPool(3).props(Props[MyActor]), "router")
 

Как мне создать BalancingPool маршрутизатор, который пересылает все сообщения моим участникам? Почему нет BalancingGroup ?

Комментарии:

1. Я предполагаю, что нет BalancingGroup, поскольку маршрутизаторы должны использовать один почтовый ящик. Однако, когда вы сами создаете маршруты, у каждого из них уже есть свой собственный почтовый ящик.

Ответ №1:

Я не думаю, что есть способ заставить BalancingPool работать с вашими актерами так, как написано, но вы можете изменить своих актеров так, чтобы они получали начальное сообщение, которое дает им информацию, необходимую для настройки. Если вы добавите этот класс:

 case class Setup(getResource: () => Any)
 

И эти строки вашему актеру:

 def receive = {
  case Setup(getResource) => {
    context.become(afterSetupReceive(getResource()))
  }
}

def afterSetupReceive(resource: Any) = {
  /* put cases from original recieve  here */
}
 

Вы можете создать свой собственный BalancedPool, например:

 val router = system.actorOf(BalancingPool(resources.length).props(Props[MyActor]), "router")
val resourcesIter = resources.iterator
val getResource = () => resourcesIter.synchronized {
  resourcesIter.next
}
router ! Broadcast(Setup(getResource))
 

Синхронизированная оболочка вокруг итератора отвратительна, но это позволит вам обойти ограничения BalancedPool .

Комментарии:

1. Большое вам спасибо! Никогда бы не подумал об этом сам 🙂 Однако я получаю странную ошибку: gist.github.com/anonymous/f8f09897e667db4d176b . Похоже, что BalancingPool создает актеров с недопустимыми именами?

2. @wingedsubmariner Я не уверен, что это работает надежно (из-за общего почтового ящика) (см. Также Это обсуждение ), и предполагается, что поддержка отправки широковещательных сообщений в BalancedPool будет удалена: github.com/akka/akka/issues/15030

3. @VolkerStampa Теперь, когда я думаю об этом, это имеет смысл. У вас есть предложения, как решить эту проблему в моем случае?

Ответ №2:

Поскольку трансляция в пул балансировки не является хорошей идеей, я придумал обходной путь. Вместо отправки широковещательной рассылки я создал синхронизированную очередь в сопутствующем объекте актера, содержащую ресурсы. При создании каждый участник удаляет элемент из этой очереди. Не очень приятное решение, но оно работает.

 // Router creation
val router = system.actorOf(BalancingPool(resources.length).props(Props[MyActor]), "router")

// In MyActor.scala
class MyActor {
  val resource = MyActor.resourceQueue.dequeue()

  // ...
}

object MyActor {
  val resourceQueue = {
    val queue = new mutable.SynchronizedQueue[MyResource]()
    queue.enqueue(...) // <-- resources
    queue
  }
}