#jms #activemq
#jms #activemq
Вопрос:
Я пытаюсь создать сеть встроенных брокеров, топология, которую я пытаюсь достичь, выглядит следующим образом:
Итак, я хочу запустить брокера 1, который будет первоначальным получателем всех сообщений и содержать тему%X%. Затем я хочу подключить брокера 2 и брокера 3 к брокеру 1 и заставить их прослушивать брокера 1 через сетевое соединение. В конце концов, я хочу разрешить потребителям получать сообщения от %X% topic, подключаясь к брокеру 2 и брокеру 3.
До сих пор я написал следующий код:
Брокер 1:
BrokerService broker = new BrokerService();
broker.addConnector("tcp://localhost:61616");
broker.addNetworkConnector("static:(tcp://localhost:61616)");
broker.start();
Брокер 2:
BrokerService broker = new BrokerService();
broker.addConnector("tcp://localhost:61617");
broker.addNetworkConnector("static:(tcp://localhost:61616)");
broker.start();
Брокер 3:
BrokerService broker = new BrokerService();
broker.addConnector("tcp://localhost:61618");
broker.addNetworkConnector("static:(tcp://localhost:61616)");
broker.start();
Производитель:
public class Producer {
private Connection connection;
public Producer() throws JMSException {
// Create a ConnectionFactory
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL("tcp://localhost:61616");
connection = connectionFactory.createConnection();
connection.start();
....
}
public void produceMessage(int x) {
try {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createTopic("Testtopic");
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
String text = "Hello world " x "! From: " Thread.currentThread().getName() " : " this.hashCode();
TextMessage message = session.createTextMessage(text);
System.out.println("Sent message: " message.hashCode() " : " Thread.currentThread().getName());
producer.send(message);
session.close();
}
......
}
}
Потребитель:
public class Consumer {
public Consumer() throws JMSException {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL("tcp://localhost:61618"); // BROKER 3
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(session.createTopic("Testtopic"));
consumer.setMessageListener(new HelloMessageListener());
}
private static class HelloMessageListener implements MessageListener {
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("Consumer " Thread.currentThread().getName() " received message: " textMessage.getText());
.......
}
}
}
Однако, когда я подключаюсь к tcp: // localhost: 61618 (который является брокером 3) от потребителя, я не могу получить никакого сообщения. В то же время, если подключиться напрямую к
tcp: //localhost: 61616 (начальный получатель, брокер 1), потребитель получает сообщения, и все идет хорошо. Я думаю, что я что-то пропустил в конфигурации соединителей. Не могли бы вы помочь мне с этим?
Спасибо,
Приветствия
Комментарии:
1. Становятся ли сетевые мосты активными? Какие-либо сообщения журнала от ActiveMQ?
2. @MattPavlovich он отправляет исключение в потоке «main» javax.jms.JMSException: Тайм-аут согласования проводного формата: одноранговый узел не отправил свой проводной формат.
Ответ №1:
Похоже, что networkConnectors имеют неправильный порт в uri.
должно быть:
На broker1: .addNetworkConnector(tcp: // localhost:61617) .addNetworkConnector(tcp: // localhost: 61618)
Вам не нужны сетевые разъемы с 2, 3 обратно на 1.
Я также предлагаю настроить полный объект и добавить некоторые параметры для поддержки конфигурации… duplex = «false», networkTTL = 1 и т.д.
Комментарии:
1. Спасибо за твои усилия, Мэтт! Я получаю следующее сообщение: Исключение в потоке «main» javax.jms.JMSException: Тайм-аут согласования проводного формата: одноранговый узел не отправил свой проводной формат. Я думаю, мне следует добавить некоторый код на стороне Broker2, чтобы заставить эту схему работать. Я имею в виду некоторый код, который явно указывает Broker2, что между Broker1 и Broker2 существует сетевой канал. Изучит проблему