Как я могу попытаться перезапустить () JMS-соединение в параллельной среде?

#java #jboss #jms

#java #jboss #jms

Вопрос:

У меня есть приложение JMS, которое пытается считывать данные из очереди JBosss. Я реализовал MessageListener в своем классе и использовал onMessage() для получения сообщений

 public class JBossConnector implements MessageListener, AutoCloseable {}
  

Вот мой метод:

 /**
 * The listener method of JMS. It listens to messages from queue: 'jbossToAppia'
 * If the message is of type MessageObject, then transfer that to Appia
 *
 * @param message JMS Message
 */
@Override
public void onMessage(Message message) {

    // receive the message from jboss queue: 'jbossToAppia'
    // then post it to appia
    if (message instanceof ObjectMessage) {
        try {

            MessageObject messageObject = (MessageObject) ((ObjectMessage) message).getObject();
            System.out.printf("JbossConnector: MessageObject received from JBOSS, %sn", messageObject.getMessageType());

            component.onMessageFromJboss(properties.getProperty("target.sessionID"), messageObject);

        } catch (MessageFormatException exception) {
            logger.error(ExceptionHandler.getFormattedException(exception));
            ExceptionHandler.printException(exception);
        } catch (JMSException exception) {
            ExceptionHandler.printException(exception);
            restart();
        }
    } else {
        System.out.printf("%s: MessageFormatException(Message is not of the format MessageObject)n", this.getClass().getSimpleName());
    }
}
  

Всякий раз, когда я нахожу JMSException , я пытаюсь перезапустить соединение JBoss (контекст, соединение, сеанс, получатель, отправитель). Я сомневаюсь, что прочитанное мной onMessage() использует несколько потоков для получения сообщений из очереди (поправьте меня, если я ошибаюсь).

Когда соединение с очередью JBoss разрывается, будет по крайней мере несколько очередей, которые генерируют это исключение. Это означает, что все они будут пытаться restart() установить соединение, что является пустой тратой времени ( restart() сначала закрывает все соединения, присваивает переменным значение null, а затем пытается инициировать соединения).

Теперь я мог бы сделать что-то вроде

 synchronized (this){
    restart();
}
  

или используйте volatile переменные. Но это не гарантирует, что другие потоки не будут пытаться restart() , когда текущие потоки завершат restart() операцию (снова поправьте меня, если я ошибаюсь).

Есть ли какое-либо решение для выполнения этой работы?

Ответ №1:

Файл onMessage() a MessageListener действительно запускается из собственного потока, поэтому вам понадобятся надлежащие средства управления параллелизмом. Я думаю, что самым простым решением было бы просто использовать java.util.concurrent.atomic.AtomicBoolean . Например, в вашем restart() методе вы могли бы сделать что-то вроде этого:

 private void restart() {
   AtomicBoolean restarting = new AtomicBoolean(false);

   if (!restarting.getAndSet(true)) {
      // restart connection, session, etc.
   }
}
  

Это сделает restart() метод эффективно идемпотентным. Несколько потоков смогут вызывать restart() , но только первый поток, который его вызывает, фактически приведет к воссозданию ресурсов. Все остальные вызовы будут немедленно возвращены.

Комментарии:

1. большое вам спасибо. Я собирался все синхронизировать и создать кучу беспорядка