#java #hornetq
#java #hornetq
Вопрос:
Я хочу реализовать систему pub-sub с несколькими потребителями. Вот мой код отправки:
ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(NettyConnectorFactory.class.getName(), getConfigMap()));
ClientSessionFactory factory = locator.createSessionFactory();
ClientSession session = factory.createSession(true, true, 10);
try {
session.createQueue(channelName, channelName, true);
} catch (HornetQException e) {
if (e.getCode() != QUEUE_EXISTS) {
throw e;
}
}
ClientProducer producer = session.createProducer(channelName);
ClientMessage message = session.createMessage(true);
message.getBodyBuffer().writeString(messageText);
session.start();
producer.send(message);
LoggerFactory.getLogger().info(String.format("Message sent to "%s".", channelName));
session.close();
factory.close();
locator.close();
И получают:
@Override
public <T extends Callback<String>> SessionObject initReceive(String channelName, Class<T> callback) throws Exception {
ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(NettyConnectorFactory.class.getName(), getConfigMap()));
ClientSessionFactory factory = locator.createSessionFactory();
ClientSession session = factory.createSession(true, true, 10);
session.start();
try {
session.createQueue(channelName, channelName, true);
} catch (HornetQException e) {
if (e.getCode() != QUEUE_EXISTS) {
throw e;
}
}
ClientConsumer consumer = session.createConsumer(channelName);
return new SessionObject(locator, factory, session, consumer);
}
/**
* Receive message from specific channel
*
* @param channelName Name of the source channel
* @param callback Custom action to do with received message
* @param <T> an extension of Callback<String>
* @throws Exception
*/
@Override
public <T extends Callback<String>> void receive(SessionObject sessionObject, String channelName, Class<T> callback, AcknowledgeCallback acknowledgeCallback) throws Exception {
ClientMessage msgReceived = sessionObject.getConsumer().receive();
if (!acknowledgeCallback.isAcknowledged(channelName)) {
LoggerFactory.getLogger().info("Message received, but not acknowledged");
return;
}
if (msgReceived != null) {
msgReceived.acknowledge();
LoggerFactory.getLogger().info("Message received.");
if (callback != null) {
callback.newInstance().run(msgReceived.getBodyBuffer().readString());
}
}
}
Но когда я пытаюсь запустить 2 или более потребителей, они работают последовательно, один работает, а остальные ждут сообщений.
Как я могу решить эту проблему?
Комментарии:
1. никаких исключений … ничего? Трудно дать вам ответ, не зная точно, что не работает
2. @Clebert исключений нет. Когда один потребитель работает, другой зависает на
receive
методе, и когда первый заканчивает свою работу, начинаются секунды. Это похоже на семафор или критический раздел, но я там такого не делал