Как настроить, чтобы очередь ActiveMQ Artemis не создавалась повторно

#activemq-artemis

Вопрос:

У меня есть встроенный брокер ActiveMQ Artemis 2.17.0, у которого нет предварительно настроенных очередей. Я бы хотел, чтобы клиент подключился к брокеру и автоматически создал свою очередь, если она не существует. Это работает просто отлично в 1-й раз, когда я запускаю клиент, но во второй раз, когда я снова подключаюсь к брокеру, клиент выдает ошибку :

 errorType=QUEUE_EXISTS message=AMQ229019: Queue hornetq already exists on address router
 

Можно ли настроить клиент или сервер таким образом, чтобы, если очередь уже существует, она не пыталась воссоздать ее снова?

Ниже приведены программы, которые я запускаю:

Сервер

 try {
    ActiveMQServer server = new ActiveMQServerImpl(new ConfigurationImpl()
         .setPersistenceEnabled(true)
         .setBindingsDirectory("./router/data/bindings")
         .setLargeMessagesDirectory("./router/data/large")
         .setPagingDirectory("./router/data/paging")
         .setJournalDirectory("./router/data/journal")
         .setSecurityEnabled(false)
         .addAcceptorConfiguration("tcp", "tcp://0.0.0.0:61617?protocols=CORE,AMQP"));
    server.start();
} catch (Exception ex) {
    System.err.println(ex);
}
 

Клиент

 ServerLocator serverLocator = ActiveMQClient.createServerLocator("tcp://127.0.0.1.3:61617");
ClientSessionFactory factory = serverLocator.createSessionFactory();
ClientSession session = factory.createSession();
session.createQueue(new QueueConfiguration("router::hornetq")
        .setAutoCreateAddress(Boolean.FALSE)
        .setAutoCreated(Boolean.FALSE)
        .setRoutingType(RoutingType.ANYCAST));
ClientProducer producer = session.createProducer("router::hornetq");
ClientMessage message = session.createMessage(true);
message.getBodyBuffer().writeString("Core Queue Message");
producer.send(message);
session.start();
ClientConsumer consumer = session.createConsumer("router::hornetq");
ClientMessage msgReceived = consumer.receive();
System.out.println("message = "   msgReceived.getBodyBuffer().readString());
session.close();
 

Я использую здесь полное имя очереди (т. Е. router::hornetq ), потому что у меня несколько очередей по router адресу.

Ответ №1:

Ваш клиент использует основной API, который является низкоуровневым API, не поддерживающим автоматическое создание очереди. Ваш клиент создает (или пытается создать) очередь вручную, например:

 session.createQueue(new QueueConfiguration("router::hornetq")
        .setAutoCreateAddress(Boolean.FALSE)
        .setAutoCreated(Boolean.FALSE)
        .setRoutingType(RoutingType.ANYCAST));
 

Исключение, которое вы видите, без сомнения, создается здесь, если очередь уже существует. Создание очереди не является идемпотентным, поэтому у вас есть 3 варианта из того, что я вижу:

  • Просто ловите ActiveMQQueueExistsException то, что бросают сюда, и игнорируйте это.
  • Используйте ClientSession.queueQuery , чтобы проверить, существует ли очередь, прежде чем пытаться ее создать. Если он существует, то не пытайтесь создать его снова. Если его не существует, то создайте его. Тем не менее, если у вас много таких клиентов, работающих одновременно, все еще возможно, что вы могли бы получить ActiveMQQueueExistsException из-за условий гонки между клиентами.
  • Используйте клиент/протокол, который поддерживает автоматическое создание, например, основной клиент JMS или AMQP.

Стоит упомянуть еще несколько вещей:

  • Поскольку вы явно не вызываете ClientSession.createAddress() , вы, вероятно, захотите использовать setAutoCreateAddress(Boolean.TRUE) .
  • Вашему потребителю не нужно использовать router::hornetq . Его можно просто использовать hornetq , и он будет получать любые сообщения, отправленные в hornetq очередь. Имена очередей универсально уникальны в брокере.

Комментарии:

1. Спасибо за оперативный ответ. Интересно, является ли отсутствие идемпотентности в core API преднамеренным или это то, что можно изучить ? Что касается имени очереди, у меня есть 2 очереди по одному и тому же адресу, отсюда явное именование.

2. Отсутствие идемпотентности обусловлено дизайном, хотя это не значит, что дизайн не мог измениться. Серверный API имеет createQueue флаг, позволяющий игнорировать вызов, если очередь уже существует.

3. Отмеченный. Еще один вопрос: мы использовали hornetq дольше всего, и теперь мы хотим перейти на artemis. Мы использовали основные API, но мы хотели бы использовать AMQP для размещения большего количества микросервисов на разных языках, есть ли заметная разница в производительности ?

4. Это действительно совершенно другой вопрос, чем тот, который вы изначально задали. Я рекомендую вам создать новый вопрос для ядра против Проблема с производительностью AMQP.

5. Я прокомментировал эту суть, которая может быть полезной.