Потребители HornetQ получают сообщения последовательно

#java #hornetq

#java #hornetq

Вопрос:

Я хочу реализовать систему pub-sub с несколькими потребителями. Вот мой код отправки:

 ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(NettyConnectorFactory.class.getName(), getConfigMap()));
    ClientSessionFactory factory = locator.createSessionFactory();
    ClientSession session = factory.createSession(true, true, 10);

    try {
        session.createQueue(channelName, channelName, true);
    } catch (HornetQException e) {
        if (e.getCode() != QUEUE_EXISTS) {
            throw e;
        }
    }

    ClientProducer producer = session.createProducer(channelName);
    ClientMessage message = session.createMessage(true);
    message.getBodyBuffer().writeString(messageText);

    session.start();

    producer.send(message);
    LoggerFactory.getLogger().info(String.format("Message sent to "%s".", channelName));

    session.close();
    factory.close();
    locator.close();
  

И получают:

 @Override
public <T extends Callback<String>> SessionObject initReceive(String channelName, Class<T> callback) throws Exception {
    ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(NettyConnectorFactory.class.getName(), getConfigMap()));
    ClientSessionFactory factory = locator.createSessionFactory();
    ClientSession session = factory.createSession(true, true, 10);
    session.start();

    try {
        session.createQueue(channelName, channelName, true);
    } catch (HornetQException e) {
        if (e.getCode() != QUEUE_EXISTS) {
            throw e;
        }
    }

    ClientConsumer consumer = session.createConsumer(channelName);

    return new SessionObject(locator, factory, session, consumer);
}

/**
 * Receive message from specific channel
 *
 * @param channelName Name of the source channel
 * @param callback    Custom action to do with received message
 * @param <T>         an extension of Callback<String>
 * @throws Exception
 */
@Override
public <T extends Callback<String>> void receive(SessionObject sessionObject, String channelName, Class<T> callback, AcknowledgeCallback acknowledgeCallback) throws Exception {
    ClientMessage msgReceived = sessionObject.getConsumer().receive();

    if (!acknowledgeCallback.isAcknowledged(channelName)) {
        LoggerFactory.getLogger().info("Message received, but not acknowledged");
        return;
    }

    if (msgReceived != null) {
        msgReceived.acknowledge();
        LoggerFactory.getLogger().info("Message received.");
        if (callback != null) {
            callback.newInstance().run(msgReceived.getBodyBuffer().readString());
        }
    }
}
  

Но когда я пытаюсь запустить 2 или более потребителей, они работают последовательно, один работает, а остальные ждут сообщений.

Как я могу решить эту проблему?

Комментарии:

1. никаких исключений … ничего? Трудно дать вам ответ, не зная точно, что не работает

2. @Clebert исключений нет. Когда один потребитель работает, другой зависает на receive методе, и когда первый заканчивает свою работу, начинаются секунды. Это похоже на семафор или критический раздел, но я там такого не делал