Сетевое соединение между двумя встроенными брокерами для обмена тематическими сообщениями

#jms #activemq

#jms #activemq

Вопрос:

Я пытаюсь создать сеть встроенных брокеров, топология, которую я пытаюсь достичь, выглядит следующим образом:

введите описание изображения здесь

Итак, я хочу запустить брокера 1, который будет первоначальным получателем всех сообщений и содержать тему%X%. Затем я хочу подключить брокера 2 и брокера 3 к брокеру 1 и заставить их прослушивать брокера 1 через сетевое соединение. В конце концов, я хочу разрешить потребителям получать сообщения от %X% topic, подключаясь к брокеру 2 и брокеру 3.

До сих пор я написал следующий код:

Брокер 1:

 BrokerService broker = new BrokerService();
broker.addConnector("tcp://localhost:61616");
broker.addNetworkConnector("static:(tcp://localhost:61616)");
broker.start();
  

Брокер 2:

 BrokerService broker = new BrokerService();
broker.addConnector("tcp://localhost:61617");
broker.addNetworkConnector("static:(tcp://localhost:61616)");
broker.start();
  

Брокер 3:

 BrokerService broker = new BrokerService();
broker.addConnector("tcp://localhost:61618");
broker.addNetworkConnector("static:(tcp://localhost:61616)");
broker.start();
  

Производитель:

 public class Producer {

    private Connection connection;

    public Producer() throws JMSException {
        // Create a ConnectionFactory
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
        connectionFactory.setBrokerURL("tcp://localhost:61616");

        connection = connectionFactory.createConnection();
        connection.start();
        ....

    }

    public void produceMessage(int x) {
        try {
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Destination destination = session.createTopic("Testtopic");
            MessageProducer producer = session.createProducer(destination);
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            String text = "Hello world "   x   "! From: "   Thread.currentThread().getName()   " : "   this.hashCode();
            TextMessage message = session.createTextMessage(text);
            System.out.println("Sent message: "  message.hashCode()   " : "   Thread.currentThread().getName());

            producer.send(message);
            session.close();
        }
         ......
    }
}
  

Потребитель:

 public class Consumer {
    public Consumer() throws JMSException {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
        connectionFactory.setBrokerURL("tcp://localhost:61618"); // BROKER 3
        Connection connection = connectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        MessageConsumer consumer = session.createConsumer(session.createTopic("Testtopic"));
        consumer.setMessageListener(new HelloMessageListener());

    }

    private static class HelloMessageListener implements MessageListener {
        public void onMessage(Message message) {
            TextMessage textMessage = (TextMessage) message;
            try {
                System.out.println("Consumer "   Thread.currentThread().getName()   " received message: "   textMessage.getText());
              .......
        }

    }
}
  

Однако, когда я подключаюсь к tcp: // localhost: 61618 (который является брокером 3) от потребителя, я не могу получить никакого сообщения. В то же время, если подключиться напрямую к
tcp: //localhost: 61616
(начальный получатель, брокер 1), потребитель получает сообщения, и все идет хорошо. Я думаю, что я что-то пропустил в конфигурации соединителей. Не могли бы вы помочь мне с этим?

Спасибо,

Приветствия

Комментарии:

1. Становятся ли сетевые мосты активными? Какие-либо сообщения журнала от ActiveMQ?

2. @MattPavlovich он отправляет исключение в потоке «main» javax.jms.JMSException: Тайм-аут согласования проводного формата: одноранговый узел не отправил свой проводной формат.

Ответ №1:

Похоже, что networkConnectors имеют неправильный порт в uri.

должно быть:

На broker1: .addNetworkConnector(tcp: // localhost:61617) .addNetworkConnector(tcp: // localhost: 61618)

Вам не нужны сетевые разъемы с 2, 3 обратно на 1.

Я также предлагаю настроить полный объект и добавить некоторые параметры для поддержки конфигурации… duplex = «false», networkTTL = 1 и т.д.

Комментарии:

1. Спасибо за твои усилия, Мэтт! Я получаю следующее сообщение: Исключение в потоке «main» javax.jms.JMSException: Тайм-аут согласования проводного формата: одноранговый узел не отправил свой проводной формат. Я думаю, мне следует добавить некоторый код на стороне Broker2, чтобы заставить эту схему работать. Я имею в виду некоторый код, который явно указывает Broker2, что между Broker1 и Broker2 существует сетевой канал. Изучит проблему