#scala #akka #load-balancing
#scala #akka #балансировка нагрузки
Вопрос:
Я пытаюсь реализовать балансировку нагрузки для набора актеров Akka. Согласно документации, BalancingPool
предлагает поведение, которое меня интересует (кража работы). По какой-то причине нет «группового варианта», который позволил бы мне самому создавать маршруты и передавать их маршрутизатору. В документах явно указано
Для BalancingPool нет группового варианта.
не говоря почему. Конструктор моего актера вызывается с аргументами, которые вычисляются во время выполнения, поэтому у меня нет другого выбора, кроме как создавать их программно.
val resources:List[Any] = // ...
val system = ActorSystem("MySystem")
val routees = resources.map(r => system.actorOf(MyActor.props(r))
// This doesn't work for me, because every actor needs a resource!
val router = system.actorOf(BalancingPool(3).props(Props[MyActor]), "router")
Как мне создать BalancingPool
маршрутизатор, который пересылает все сообщения моим участникам? Почему нет BalancingGroup
?
Комментарии:
1. Я предполагаю, что нет BalancingGroup, поскольку маршрутизаторы должны использовать один почтовый ящик. Однако, когда вы сами создаете маршруты, у каждого из них уже есть свой собственный почтовый ящик.
Ответ №1:
Я не думаю, что есть способ заставить BalancingPool
работать с вашими актерами так, как написано, но вы можете изменить своих актеров так, чтобы они получали начальное сообщение, которое дает им информацию, необходимую для настройки. Если вы добавите этот класс:
case class Setup(getResource: () => Any)
И эти строки вашему актеру:
def receive = {
case Setup(getResource) => {
context.become(afterSetupReceive(getResource()))
}
}
def afterSetupReceive(resource: Any) = {
/* put cases from original recieve here */
}
Вы можете создать свой собственный BalancedPool, например:
val router = system.actorOf(BalancingPool(resources.length).props(Props[MyActor]), "router")
val resourcesIter = resources.iterator
val getResource = () => resourcesIter.synchronized {
resourcesIter.next
}
router ! Broadcast(Setup(getResource))
Синхронизированная оболочка вокруг итератора отвратительна, но это позволит вам обойти ограничения BalancedPool
.
Комментарии:
1. Большое вам спасибо! Никогда бы не подумал об этом сам 🙂 Однако я получаю странную ошибку: gist.github.com/anonymous/f8f09897e667db4d176b . Похоже, что BalancingPool создает актеров с недопустимыми именами?
2. @wingedsubmariner Я не уверен, что это работает надежно (из-за общего почтового ящика) (см. Также Это обсуждение ), и предполагается, что поддержка отправки широковещательных сообщений в BalancedPool будет удалена: github.com/akka/akka/issues/15030
3. @VolkerStampa Теперь, когда я думаю об этом, это имеет смысл. У вас есть предложения, как решить эту проблему в моем случае?
Ответ №2:
Поскольку трансляция в пул балансировки не является хорошей идеей, я придумал обходной путь. Вместо отправки широковещательной рассылки я создал синхронизированную очередь в сопутствующем объекте актера, содержащую ресурсы. При создании каждый участник удаляет элемент из этой очереди. Не очень приятное решение, но оно работает.
// Router creation
val router = system.actorOf(BalancingPool(resources.length).props(Props[MyActor]), "router")
// In MyActor.scala
class MyActor {
val resource = MyActor.resourceQueue.dequeue()
// ...
}
object MyActor {
val resourceQueue = {
val queue = new mutable.SynchronizedQueue[MyResource]()
queue.enqueue(...) // <-- resources
queue
}
}