Транспорт ActiveMQ: tcp: состояние выполнения потока — слишком много зависших потоков

#java #jms #activemq #message-queue #producer-consumer

#java #jms #activemq #очередь сообщений #производитель-потребитель

Вопрос:

Приведенная ниже реализация ActiveMQ присутствует в коде. Иногда система перестает работать и становится очень медленной. Когда я проверил дамп потока с помощью JavaMelody — я увидел, что слишком много потоков находятся в состоянии выполнения в течение длительного времени и не завершаются.

Версия ActiveMQ — activemq-all-5.3.0.jar

Пожалуйста, обратитесь к приведенному ниже коду :

Брокер :

 public class ActiveMQ extends HttpServlet {

private static final long serialVersionUID = -1234568008764323456;
private static final Logger logger = Logger.getLogger(ActiveMQ.class.getName());
public Listener listener;

private String msgBrokerUrl = "tcp://localhost:61602";
public BrokerService broker = null;
public TransportConnector connector = null;

@Override
public void init() throws ServletException {

    try {
        broker = new BrokerService();
        broker.setPersistent(false);
        broker.setUseJmx(false);
        connector = broker.addConnector(msgBrokerUrl);
        broker.setUseShutdownHook(true);
        System.out.println("BROKER LOADED");
        broker.start();
        broker.deleteAllMessages();

        listener = new Listener();

    } catch (Exception e) {
        e.printStackTrace();
    }
}
  

}

Прослушиватель:

 public class Listener implements MessageListener {

private String msgQueueName = "jms/queue/MessageQueue";
public Session session;
public Destination adminQueue;

public static String id;

public ActiveMQConnection connection;
public MessageConsumer consumer = null;

public Listener() {
    try {

        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                new URI("failover://("   "tcp://localhost:61602"   "?wireFormat.cacheEnabled=false"
                          "amp;wireFormat.maxInactivityDuration=0amp;wireFormat.tightEncodingEnabled=true)?maxReconnectDelay=1000"));

        connection = (ActiveMQConnection) connectionFactory.createConnection();
        connection.start();
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        adminQueue = session.createQueue(msgQueueName);
        id = new Timestamp(new Date().getTime()).toString();
        consumer = this.session.createConsumer(this.adminQueue, "ID='"   id   "'");
        consumer.setMessageListener(this);
    } catch (JMSException e) {
        e.printStackTrace();
    } catch (Exception e) {
        e.printStackTrace();
    }
}

@SuppressWarnings("unchecked")
@Override
public void onMessage(Message message) {
    TextMessage msg = (TextMessage) message;
    try {
        String xmlMsg = msg.getText();
        // business logic
    } catch (JMSException ex) {
        ex.printStackTrace();
    } catch (Exception e) {
        e.printStackTrace();
    }
}
  

Производитель :

 public class Producer {
private static String url = "tcp://localhost:61602";
private static String msgQueueName = "jms/queue/MessageQueue";

public ConnectionFactory connectionFactory = null;
public Connection connection = null;
public Session session = null;
public Destination destination = null;

public Producer() {
    connectionFactory = new ActiveMQConnectionFactory(url);
}

public void sendResponse(String xml, DataBean objDataBean) {
    sendToQueue(xml, msgQueueName, objDataBean);
}

private void sendToQueue(String xml, String msgQueueName, DataBean obj) {

    try {
        Connection connection = connectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination destination = session.createQueue(msgQueueName);
        MessageProducer producer = session.createProducer(destination);
        TextMessage message = session.createTextMessage(xml);
        message.setJMSExpiration(1000);
        message.setStringProperty(obj.getMsgKey(), obj.getMsgValue());
        producer.send(message);

        xml = null;
        session.close();
        connection.close();

    } catch (Exception e) {
        e.printStackTrace();
    }
}

public static void main(String[] args) {

    for (int msg = 0; msg < 20; msg  ) {
        DataBean obj = getData();
        new Producer().sendResponse(xml, obj);
        ;
    }
}
  

}

Сведения об исключении зависших потоков :

Тип 1 :

 ActiveMQ Transport: tcp:///127.0.0.1:41818
java.net.SocketInputStream.socketRead0(Native Method)
java.net.SocketInputStream.read(SocketInputStream.java:152)
java.net.SocketInputStream.read(SocketInputStream.java:122)
org.apache.activemq.transport.tcp.TcpBufferedInputStream.fill(TcpBufferedInputStream.java:50)
org.apache.activemq.transport.tcp.TcpBufferedInputStream.read(TcpBufferedInputStream.java:58)
java.io.DataInputStream.readInt(DataInputStream.java:387)
org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:272)
org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:210)
org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:202)
org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185)
java.lang.Thread.run(Thread.java:745)
  

Тип 2 :

 ActiveMQ Transport: tcp://localhost/127.0.0.1:61602
java.net.SocketInputStream.socketRead0(Native Method)
java.net.SocketInputStream.read(SocketInputStream.java:152)
java.net.SocketInputStream.read(SocketInputStream.java:122)
org.apache.activemq.transport.tcp.TcpBufferedInputStream.fill(TcpBufferedInputStream.java:50)
org.apache.activemq.transport.tcp.TcpBufferedInputStream.read(TcpBufferedInputStream.java:58)
java.io.DataInputStream.readInt(DataInputStream.java:387)
org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:272)
org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:210)
org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:202)
org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185)
java.lang.Thread.run(Thread.java:745)
  

Пожалуйста, не могли бы вы дать несколько советов по этой проблеме для дальнейшего изучения.

Редактировать: я прочитал несколько сообщений в Интернете и пришел к выводу, что я должен обновить jar-файл activemq и реализовать тайм-аут, но когда я начал читать о настройке тайм-аута, я запутался, следует ли мне устанавливать тайм-аут в producer и consumer или failover или в message или broker service. Тайм-аут в каждом компоненте имеет разную цель, тогда как я должен реализовать тайм-аут, учитывая приведенный выше код и исключение.

Комментарии:

1. Все, что вы описали, является нормальным. Если у вас слишком много выполняемых потоков, это означает, что у вас слишком много потоков для имеющейся у вас мощности процессора. Примечание: ожидание потока socketRead0 означает, что потоку нечего делать (он ожидает, пока другой отправитель что-то напишет)

2. Почему генерируется слишком много номеров портов — я имею в виду, что для каждого потока существует уникальный номер порта.

3. tcp://localhost/ 127.0.0.1: 61602 — Это снова вызывает озабоченность.

4. Каждый сокет имеет свой собственный номер порта в соответствии со спецификацией TCP / IP.

5. javaeesupportpatterns.blogspot.in/2011/04/…

Ответ №1:

Создание соединения обходится очень дорого, и когда вы его закрываете, порт сохраняется до 3 минут, чтобы гарантировать его чистое завершение.

Вы хотите создавать соединения только тогда, когда вам действительно нужно избежать проблем с производительностью. Я предлагаю вам создать соединение один раз и поддерживать это соединение открытым, если вы не получите сообщение об ошибке. Это может повысить производительность на 2-3 порядка.

Это хороший шаблон настройки производительности, который применяется во многих случаях;

  • создавайте и уничтожайте дорогостоящие ресурсы только тогда, когда это действительно необходимо.
  • операции, которые вы выполняете много раз, должны быть сведены к минимуму. т. е. выполняйте повторно как можно меньше.

Комментарии:

1. Согласно приведенному выше коду, соединение с прослушивателем создается только один раз и продолжает выполняться, когда метод onMessage вызывается непрерывно, но соединение с производителем создается для каждого запуска цикла из основного метода. Производитель это проблема, из-за которой мне придется создавать соединение только один раз.. пожалуйста, сообщите.