Akka ConsistentHashingRoutingLogic не выполняет последовательную маршрутизацию в один и тот же поток диспетчера

#java #akka #consistent-hashing

#java #akka #согласованное хэширование

Вопрос:

Я пытаюсь использовать Akka ConsistentHashingRoutingLogic , чтобы гарантировать, что сообщения с одинаковым ключом перенаправляются одному и тому же участнику. Важно, чтобы сообщения с одним и тем же ключом обрабатывались в порядке FIFO. Сообщения с разными ключами могут быть перенаправлены разным участникам и свободно обрабатываться параллельно. Я не использую Akka в распределенном режиме.

Сообщения на самом деле являются сообщениями JSON, считываемыми из брокера RabbitMQ, поэтому мой главный исполнитель получает сообщение AMQP и использует ключ маршрутизации в качестве ключа сообщения. Тот же ключ также содержится в самом сообщении. Исполнитель является частью приложения Spring.

Мой главный исполнитель выглядит следующим образом:

 @Named("MessageHandlerMaster")
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class MessageHandlerMaster extends UntypedActor {

  private static final Logger log = LoggerFactory.getLogger(MessageHandlerMaster.class);

  private Router router;

  @Autowired
  public MessageHandlerMaster(final SpringProps springProps) {

  List<Routee> routees = Stream.generate(() -> {
      ActorRef worker = getContext().actorOf(springProps.create(MessageHandlerWorker.class));
      getContext().watch(worker);
      return new ActorRefRoutee(worker);
    }).limit(5) //todo: configurable number of workers
      .collect(Collectors.toList());

    router = new Router(new ConsistentHashingRoutingLogic(getContext().system()), routees);
  }

  public void onReceive(Object message) {
    if (message instanceof Message) {
      Message amqpMessage = (Message) message;
      String encoding = getMessageEncoding(amqpMessage);
      try {
        String json = new String(amqpMessage.getBody(), encoding);
        String routingKey = amqpMessage.getMessageProperties().getReceivedRoutingKey();
        log.debug("Routing message based on routing key "   routingKey);
        router.route(new ConsistentHashingRouter.ConsistentHashableEnvelope(json, routingKey), getSender());
      } catch (UnsupportedEncodingException e) {
        log.warn("Unknown content encoding sent in message! {}", encoding);
      }
    } else if (message instanceof Terminated) {
      //if one of the routee's died, remove it and replace it
      log.debug("Actor routee terminated!");
      router.removeRoutee(((Terminated) message).actor());
      ActorRef r = getContext().actorOf(Props.create(MessageHandlerWorker.class));
      getContext().watch(r);
      router = router.addRoutee(new ActorRefRoutee(r));
    }
  }

  private static String getMessageEncoding(Message message) {
    String encoding = message.getMessageProperties().getContentEncoding();
    if ((encoding == null) || (encoding.equals(""))) {
      encoding = "UTF-8";
    }
    return encoding;
  }
}
  

Я изначально получаю мастер один раз с помощью:

 this.master = actorSystem.actorOf(springProps.create(MessageHandlerMaster.class), "master");
  

а затем просто отправляет ему сообщения с помощью:

 master.tell(message, ActorRef.noSender());
  

Но когда я печатаю журналы из моего рабочего onReceive() , я вижу, что иногда для одного и того же ключа используются разные потоки диспетчера.

Также неясно, почему иногда один и тот же поток диспетчера используется для главного участника и для рабочего участника. Разве это не должно быть асинхронной передачей сообщений между потоками?

 16:45:13.359 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186
16:45:13.359 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.360 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
  

Как вы можете видеть здесь, поток диспетчера для рабочего, обрабатывающего сообщение с ключом 10420186, иногда был 9, а иногда 10. Главный исполнитель иногда также использовал эти 2 потока.

Как я могу быть уверен, что ConsistentHashingRoutingLogic действительно работает и один и тот же поток обрабатывает сообщения с одним и тем же ключом? Я делаю что-то не так при инициализации моего маршрутизатора?

Комментарии:

1. Я думаю, что участники не привязаны к потокам. Таким образом, в прикрепленном журнале нет ничего неправильного.

Ответ №1:

Итак, @vrudkovsk прав в своем комментарии. Я думаю, вы путаетесь между потоками и участниками. Действующие лица — это просто объекты в памяти, у которых есть адрес и почтовый ящик. Диспетчеры — это, по сути, пулы потоков, которые выполняют действия с субъектом. Примерами действий являются:

  • извлеките сообщение из очереди из почтового ящика, чтобы обработать его в субъекте
  • поставить сообщение в очередь на почтовый ящик.

Разные потоки могут выполнять действия для одного и того же субъекта. Это решает диспетчер. Akka гарантирует, что только один поток одновременно будет обрабатывать сообщение внутри субъекта. Это не означает, что это всегда будет один и тот же поток.

Если вы хотите убедиться, что они поступают к одному и тому же субъекту, я бы рекомендовал регистрировать путь или адрес субъекта с помощью context.self.path или context.self.path.address , поскольку это уникальные идентификаторы в пределах одного и того же ActorSystem .

Комментарии:

1. Спасибо за ваш ответ. Итак, в этом случае, есть ли у меня все еще уверенность в том, что если у меня есть 5 рабочих субъектов, независимо от того, какой поток используется для субъекта 1, поскольку я использую ConsistentHashingRoutingLogic , нет никакого способа, которым второй поток диспетчера получит другое сообщение, которое должно быть перенаправлено субъекту 1 до обработки предыдущего сообщения?

2. Это правильно. ConsistentHashingRoutingLogic Гарантирует, что ваше сообщение попадет к нужному участнику, независимо от того, какой поток выполняет работу.