#java #jboss #jms
#java #jboss #jms
Вопрос:
У меня есть приложение JMS, которое пытается считывать данные из очереди JBosss. Я реализовал MessageListener
в своем классе и использовал onMessage()
для получения сообщений
public class JBossConnector implements MessageListener, AutoCloseable {}
Вот мой метод:
/**
* The listener method of JMS. It listens to messages from queue: 'jbossToAppia'
* If the message is of type MessageObject, then transfer that to Appia
*
* @param message JMS Message
*/
@Override
public void onMessage(Message message) {
// receive the message from jboss queue: 'jbossToAppia'
// then post it to appia
if (message instanceof ObjectMessage) {
try {
MessageObject messageObject = (MessageObject) ((ObjectMessage) message).getObject();
System.out.printf("JbossConnector: MessageObject received from JBOSS, %sn", messageObject.getMessageType());
component.onMessageFromJboss(properties.getProperty("target.sessionID"), messageObject);
} catch (MessageFormatException exception) {
logger.error(ExceptionHandler.getFormattedException(exception));
ExceptionHandler.printException(exception);
} catch (JMSException exception) {
ExceptionHandler.printException(exception);
restart();
}
} else {
System.out.printf("%s: MessageFormatException(Message is not of the format MessageObject)n", this.getClass().getSimpleName());
}
}
Всякий раз, когда я нахожу JMSException
, я пытаюсь перезапустить соединение JBoss (контекст, соединение, сеанс, получатель, отправитель). Я сомневаюсь, что прочитанное мной onMessage()
использует несколько потоков для получения сообщений из очереди (поправьте меня, если я ошибаюсь).
Когда соединение с очередью JBoss разрывается, будет по крайней мере несколько очередей, которые генерируют это исключение. Это означает, что все они будут пытаться restart()
установить соединение, что является пустой тратой времени ( restart()
сначала закрывает все соединения, присваивает переменным значение null, а затем пытается инициировать соединения).
Теперь я мог бы сделать что-то вроде
synchronized (this){
restart();
}
или используйте volatile
переменные. Но это не гарантирует, что другие потоки не будут пытаться restart()
, когда текущие потоки завершат restart()
операцию (снова поправьте меня, если я ошибаюсь).
Есть ли какое-либо решение для выполнения этой работы?
Ответ №1:
Файл onMessage()
a MessageListener
действительно запускается из собственного потока, поэтому вам понадобятся надлежащие средства управления параллелизмом. Я думаю, что самым простым решением было бы просто использовать java.util.concurrent.atomic.AtomicBoolean
. Например, в вашем restart()
методе вы могли бы сделать что-то вроде этого:
private void restart() {
AtomicBoolean restarting = new AtomicBoolean(false);
if (!restarting.getAndSet(true)) {
// restart connection, session, etc.
}
}
Это сделает restart()
метод эффективно идемпотентным. Несколько потоков смогут вызывать restart()
, но только первый поток, который его вызывает, фактически приведет к воссозданию ресурсов. Все остальные вызовы будут немедленно возвращены.
Комментарии:
1. большое вам спасибо. Я собирался все синхронизировать и создать кучу беспорядка