Используйте фабрику объединенных подключений на Spring с переключением на Artemis ActiveMQ для обработки повторной отправки сообщения

#connection-pooling #spring-jms #activemq-artemis

#объединение в пул соединений #spring-jms #activemq-artemis

Вопрос:

Я использую фабрику объединенных подключений для подключения к кластеру высокой доступности ActiveMQ Artemis. Приведенный ниже код показывает мою текущую реализацию.

   @Bean
  public ConnectionFactory jmsConnectionFactory() {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl,username,password);;
        return connectionFactory;
  }

   @Bean
   public JmsPoolConnectionFactory pooledConnectionFactoryOnline() {
        JmsPoolConnectionFactory poolingFactory = new JmsPoolConnectionFactory();
        poolingFactory.setConnectionFactory(jmsConnectionFactory());
        poolingFactory.setMaxConnections(3);
        poolingFactory.setConnectionIdleTimeout(0);

        return poolingFactory;
    }

    @Bean
    public JmsTemplate jmsTemplateOnline() {
        JmsTemplate jmsTemplate = new JmsTemplate();
        jmsTemplate.setConnectionFactory(pooledConnectionFactoryOnline());
        jmsTemplate.setDefaultDestinationName(QUEUE);
        return jmsTemplate;
    }
 

Приведенная выше реализация пула фабрики подключений взята из org.messaginghub.pooled.jms.JmsPoolConnectionFactory (но я столкнулся с аналогичными проблемами org.springframework.jms.connection.CachingConnectionFactory )
, а строка подключения, используемая для случая отработки отказа, является (tcp://broker1:61616,tcp://broker2:62616)?ha=trueamp;reconnectAttempts=-1 .

Также мою конфигурацию политики ha для главного брокера можно увидеть ниже

  <connectors>
    <connector name="broker1-connector">tcp://broker1:61616</connector>
    <connector name="broker2-connector">tcp://broker2:61616</connector>
 </connectors>

 <ha-policy>
    <replication>
      <master>
        <check-for-live-server>true</check-for-live-server>
      </master>
    </replication>
 </ha-policy>

 <cluster-connections>
    <cluster-connection name="myhost1-cluster">
      <connector-ref>broker1-connector</connector-ref>
      <retry-interval>500</retry-interval>
      <use-duplicate-detection>true</use-duplicate-detection>
      <static-connectors>
        <connector-ref>broker2-connector</connector-ref>
      </static-connectors>
    </cluster-connection>
 </cluster-connections>
 

и для подчиненного брокера соответственно

 <ha-policy>
   <replication>
      <slave>
          <allow-failback>true</allow-failback>
      </slave>
   </replication>
</ha-policy>
 

Журналы для главного брокера приведены ниже

 
2021-01-24 21:05:56,093 INFO  [org.apache.activemq.artemis.core.server] AMQ221082: Initializing metrics plugin org.apache.activemq.artemis.core.server.metrics.plugins.ArtemisPrometheusMetricsPlugin with properties: {}
2021-01-24 21:05:56,266 INFO  [org.apache.activemq.artemis.integration.bootstrap] AMQ101000: Starting ActiveMQ Artemis Server
2021-01-24 21:05:56,288 INFO  [org.apache.activemq.artemis.core.server] AMQ221000: live Message Broker is starting with configuration Broker Configuration (clustered=true,journalDirectory=data/journal,bindingsDirectory=data/bindings,largeMessagesDirectory=data/large-messages,pagingDirectory=data/paging)
2021-01-24 21:05:58,987 INFO  [org.apache.activemq.artemis.core.server] AMQ221055: There were too many old replicated folders upon startup, removing /var/lib/artemis/data/bindings/oldreplica.94
2021-01-24 21:05:58,994 INFO  [org.apache.activemq.artemis.core.server] AMQ222162: Moving data directory /var/lib/artemis/data/bindings to /var/lib/artemis/data/bindings/oldreplica.96
2021-01-24 21:05:59,001 INFO  [org.apache.activemq.artemis.core.server] AMQ221055: There were too many old replicated folders upon startup, removing /var/lib/artemis/data/journal/oldreplica.94
2021-01-24 21:05:59,058 INFO  [org.apache.activemq.artemis.core.server] AMQ222162: Moving data directory /var/lib/artemis/data/journal to /var/lib/artemis/data/journal/oldreplica.96
2021-01-24 21:05:59,062 INFO  [org.apache.activemq.artemis.core.server] AMQ221055: There were too many old replicated folders upon startup, removing /var/lib/artemis/data/paging/oldreplica.94
2021-01-24 21:05:59,068 INFO  [org.apache.activemq.artemis.core.server] AMQ222162: Moving data directory /var/lib/artemis/data/paging to /var/lib/artemis/data/paging/oldreplica.96
2021-01-24 21:05:59,135 INFO  [org.apache.activemq.artemis.core.server] AMQ221013: Using NIO Journal
2021-01-24 21:05:59,140 WARN  [org.apache.activemq.artemis.core.server] AMQ222007: Security risk! Apache ActiveMQ Artemis is running with the default cluster admin user and default password. Please see the cluster chapter in the ActiveMQ Artemis User Guide for instructions on how to change this.
2021-01-24 21:05:59,149 INFO  [org.apache.activemq.artemis.core.server] AMQ221057: Global Max Size is being adjusted to 1/2 of the JVM max size (-Xmx). being defined as 16,089,350,144
2021-01-24 21:05:59,300 INFO  [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-server]. Adding protocol support for: CORE
2021-01-24 21:05:59,303 INFO  [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-amqp-protocol]. Adding protocol support for: AMQP
2021-01-24 21:05:59,305 INFO  [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-hornetq-protocol]. Adding protocol support for: HORNETQ
2021-01-24 21:05:59,306 INFO  [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-mqtt-protocol]. Adding protocol support for: MQTT
2021-01-24 21:05:59,306 INFO  [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-openwire-protocol]. Adding protocol support for: OPENWIRE
2021-01-24 21:05:59,307 INFO  [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-stomp-protocol]. Adding protocol support for: STOMP
2021-01-24 21:05:59,463 INFO  [org.apache.activemq.artemis.core.server] AMQ221109: Apache ActiveMQ Artemis Backup Server version 2.13.0 [null] started, waiting live to fail before it gets active
2021-01-24 21:05:59,555 INFO  [org.apache.activemq.hawtio.branding.PluginContextListener] Initialized activemq-branding plugin
2021-01-24 21:05:59,638 INFO  [org.apache.activemq.hawtio.plugin.PluginContextListener] Initialized artemis-plugin plugin
2021-01-24 21:06:00,447 INFO  [io.hawt.HawtioContextListener] Initialising hawtio services
2021-01-24 21:06:00,471 INFO  [io.hawt.system.ConfigManager] Configuration will be discovered via system properties
2021-01-24 21:06:00,474 INFO  [io.hawt.jmx.JmxTreeWatcher] Welcome to hawtio 1.5.12 : http://hawt.io/ : Don't cha wish your console was hawt like me? ;-)
2021-01-24 21:06:00,478 INFO  [io.hawt.jmx.UploadManager] Using file upload directory: /var/lib/artemis/tmp/uploads
2021-01-24 21:06:00,501 INFO  [io.hawt.web.AuthenticationFilter] Starting hawtio authentication filter, JAAS realm: "activemq" authorized role(s): "amq" role principal classes: "org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal"
2021-01-24 21:06:00,535 INFO  [io.hawt.web.JolokiaConfiguredAgentServlet] Jolokia overridden property: [key=policyLocation, value=file:/var/lib/artemis/etc/jolokia-access.xml]
2021-01-24 21:06:00,572 INFO  [io.hawt.web.RBACMBeanInvoker] Using MBean [hawtio:type=security,area=jmx,rank=0,name=HawtioDummyJMXSecurity] for role based access control
2021-01-24 21:06:00,824 INFO  [io.hawt.system.ProxyWhitelist] Initial proxy whitelist: [localhost, 127.0.0.1, 172.23.0.7, 42546424839f]
2021-01-24 21:06:01,245 INFO  [org.apache.activemq.artemis] AMQ241001: HTTP Server started at http://0.0.0.0:8161
2021-01-24 21:06:01,245 INFO  [org.apache.activemq.artemis] AMQ241002: Artemis Jolokia REST API available at http://0.0.0.0:8161/console/jolokia
2021-01-24 21:06:01,245 INFO  [org.apache.activemq.artemis] AMQ241004: Artemis Console available at http://0.0.0.0:8161/console
2021-01-24 21:06:03,263 INFO  [org.apache.activemq.artemis.core.server] AMQ221024: Backup server ActiveMQServerImpl::serverUUID=b96ecec9-e13e-11ea-8a4f-0242ac170006 is synchronized with live-server.
2021-01-24 21:06:09,763 INFO  [org.apache.activemq.artemis.core.server] AMQ221031: backup announced
2021-01-24 21:06:09,806 WARN  [org.apache.activemq.artemis.core.client] AMQ212037: Connection failure to broker2/broker2:61616 has been detected: AMQ219015: The connection was disconnected because of server shutdown [code=DISCONNECTED]
2021-01-24 21:06:09,806 WARN  [org.apache.activemq.artemis.core.client] AMQ212037: Connection failure to broker2/broker2:61616 has been detected: AMQ219015: The connection was disconnected because of server shutdown [code=DISCONNECTED]
2021-01-24 21:06:09,875 INFO  [org.apache.activemq.artemis.core.server] AMQ221037: ActiveMQServerImpl::serverUUID=b96ecec9-e13e-11ea-8a4f-0242ac170006 to become 'live'
2021-01-24 21:06:09,897 WARN  [org.apache.activemq.artemis.core.client] AMQ212004: Failed to connect to server.
2021-01-24 21:06:10,553 INFO  [org.apache.activemq.artemis.core.server] AMQ221080: Deploying address DLQ supporting [ANYCAST]
2021-01-24 21:06:10,554 INFO  [org.apache.activemq.artemis.core.server] AMQ221003: Deploying ANYCAST queue DLQ on address DLQ
2021-01-24 21:06:10,555 INFO  [org.apache.activemq.artemis.core.server] AMQ221080: Deploying address ExpiryQueue supporting [ANYCAST]
2021-01-24 21:06:10,555 INFO  [org.apache.activemq.artemis.core.server] AMQ221003: Deploying ANYCAST queue ExpiryQueue on address ExpiryQueue
2021-01-24 21:06:10,803 INFO  [org.apache.activemq.artemis.core.server] AMQ221007: Server is now live
2021-01-24 21:06:10,865 INFO  [org.apache.activemq.artemis.core.server] AMQ221020: Started EPOLL Acceptor at 0.0.0.0:61616 for protocols [CORE,MQTT,AMQP,STOMP,HORNETQ,OPENWIRE]
 

and for slave broker similarly are shown below

 2021-01-24 21:05:59,975 INFO  [org.apache.activemq.artemis.core.server] AMQ221025: Replication: sending NIOSequentialFile /var/lib/artemis/data/journal/activemq-data-1262.amq (size=10,485,760) to replica.
2021-01-24 21:06:01,346 INFO  [org.apache.activemq.artemis.core.server] AMQ221025: Replication: sending NIOSequentialFile /var/lib/artemis/data/journal/activemq-data-1261.amq (size=10,485,760) to replica.
2021-01-24 21:06:02,253 INFO  [org.apache.activemq.artemis.core.server] AMQ221025: Replication: sending NIOSequentialFile /var/lib/artemis/data/bindings/activemq-bindings-1191.bindings (size=1,048,576) to replica.
2021-01-24 21:06:02,363 INFO  [org.apache.activemq.artemis.core.server] AMQ221025: Replication: sending NIOSequentialFile /var/lib/artemis/data/bindings/activemq-bindings-1196.bindings (size=1,048,576) to replica.
2021-01-24 21:06:02,451 INFO  [org.apache.activemq.artemis.core.server] AMQ221025: Replication: sending NIOSequentialFile /var/lib/artemis/data/bindings/activemq-bindings-1189.bindings (size=1,048,576) to replica.
2021-01-24 21:06:09,756 INFO  [org.apache.activemq.artemis.core.server] AMQ224100: Timed out waiting for large messages deletion with IDs [], might not be deleted if broker crashes atm
2021-01-24 21:06:09,756 INFO  [org.apache.activemq.artemis.core.server] AMQ224100: Timed out waiting for large messages deletion with IDs [], might not be deleted if broker crashes atm
2021-01-24 21:06:09,756 INFO  [org.apache.activemq.artemis.core.server] AMQ224100: Timed out waiting for large messages deletion with IDs [], might not be deleted if broker crashes atm
2021-01-24 21:06:09,756 INFO  [org.apache.activemq.artemis.core.server] AMQ224100: Timed out waiting for large messages deletion with IDs [], might not be deleted if broker crashes atm
2021-01-24 21:06:10,046 INFO  [org.apache.activemq.artemis.core.server] AMQ221002: Apache ActiveMQ Artemis Message Broker version 2.13.0 [b96ecec9-e13e-11ea-8a4f-0242ac170006] stopped, uptime 6 hours 32 minutes
2021-01-24 21:06:10,046 INFO  [org.apache.activemq.artemis.core.server] AMQ221039: Restarting as Replicating backup server after live restart
2021-01-24 21:06:10,050 INFO  [org.apache.activemq.artemis.core.server] AMQ221000: backup Message Broker is starting with configuration Broker Configuration (clustered=true,journalDirectory=data/journal,bindingsDirectory=data/bindings,largeMessagesDirectory=data/large-messages,pagingDirectory=data/paging)
2021-01-24 21:06:10,053 INFO  [org.apache.activemq.artemis.core.server] AMQ221055: There were too many old replicated folders upon startup, removing /var/lib/artemis/data/bindings/oldreplica.101
2021-01-24 21:06:10,059 INFO  [org.apache.activemq.artemis.core.server] AMQ222162: Moving data directory /var/lib/artemis/data/bindings to /var/lib/artemis/data/bindings/oldreplica.103
2021-01-24 21:06:10,060 INFO  [org.apache.activemq.artemis.core.server] AMQ221055: There were too many old replicated folders upon startup, removing /var/lib/artemis/data/journal/oldreplica.101
2021-01-24 21:06:10,110 INFO  [org.apache.activemq.artemis.core.server] AMQ222162: Moving data directory /var/lib/artemis/data/journal to /var/lib/artemis/data/journal/oldreplica.103
2021-01-24 21:06:10,111 INFO  [org.apache.activemq.artemis.core.server] AMQ221055: There were too many old replicated folders upon startup, removing /var/lib/artemis/data/paging/oldreplica.100
2021-01-24 21:06:10,117 INFO  [org.apache.activemq.artemis.core.server] AMQ222162: Moving data directory /var/lib/artemis/data/paging to /var/lib/artemis/data/paging/oldreplica.102
2021-01-24 21:06:10,120 INFO  [org.apache.activemq.artemis.core.server] AMQ221013: Using NIO Journal
2021-01-24 21:06:10,121 WARN  [org.apache.activemq.artemis.core.server] AMQ222007: Security risk! Apache ActiveMQ Artemis is running with the default cluster admin user and default password. Please see the cluster chapter in the ActiveMQ Artemis User Guide for instructions on how to change this.
2021-01-24 21:06:10,124 INFO  [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-server]. Adding protocol support for: CORE
2021-01-24 21:06:10,127 INFO  [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-amqp-protocol]. Adding protocol support for: AMQP
2021-01-24 21:06:10,127 INFO  [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-hornetq-protocol]. Adding protocol support for: HORNETQ
2021-01-24 21:06:10,127 INFO  [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-mqtt-protocol]. Adding protocol support for: MQTT
2021-01-24 21:06:10,127 INFO  [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-openwire-protocol]. Adding protocol support for: OPENWIRE
2021-01-24 21:06:10,128 INFO  [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-stomp-protocol]. Adding protocol support for: STOMP
2021-01-24 21:06:11,138 INFO  [org.apache.activemq.artemis.core.server] AMQ221109: Apache ActiveMQ Artemis Backup Server version 2.13.0 [null] started, waiting live to fail before it gets active
2021-01-24 21:06:14,559 INFO  [org.apache.activemq.artemis.core.server] AMQ221024: Backup server ActiveMQServerImpl::serverUUID=b96ecec9-e13e-11ea-8a4f-0242ac170006 is synchronized with live-server.
2021-01-24 21:06:14,594 INFO  [org.apache.activemq.artemis.core.server] AMQ221031: backup announced
 

Когда я пытаюсь протестировать отказоустойчивость и остановить главный брокер, я вижу, что мой клиент получает исключение соединения, которое я пытаюсь
обработать, чтобы избежать потери каких-либо сообщений. Я останавливаю контейнер docker с помощью docker stop (который сначала останавливает запущенный контейнер, отправляя SIGTERM сигнал, а по истечении времени SIGKILL ожидания — сигнал). Поскольку я знаю, что весь трафик будет перенаправлен подчиненному брокеру, мой подход заключается в следующем:

   @Autowired
  JmsPoolConnectionFactory poolFactory;

  try {
        jmsTemplateOnline.convertAndSend(QUEUE, message);
    }
    catch (JmsException e){

        try (Connection connection = poolFactory.createConnection();
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            MessageProducer producer = session.createProducer(new ActiveMQQueue(QUEUE))) {

            producer.send(messageConverter.toMessage(message,session));

        } catch (Exception jmsException) {
            jmsException.printStackTrace();
        }
    }
 

По сути, поскольку соединения из пула завершились неудачно, насколько я понимаю, они должны отскочить и создать новое соединение с подчиненным
брокером, чтобы получение нового соединения отправило мое сообщение. Что происходит

 [Thread-4 (ActiveMQ-client-global-threads)] [WARN ] org.apache.activemq.artemis.core.client - AMQ212037: Connection failure to /broker1:61616 has been detected: AMQ219015: The connection was disconnected because of server shutdown [code=DISCONNECTED]
[Thread-1 (ActiveMQ-client-global-threads)] [WARN ] org.apache.activemq.artemis.core.client - AMQ212037: Connection failure to /broker1:61616 has been detected: AMQ219015: The connection was disconnected because of server shutdown [code=DISCONNECTED]
[Thread-2 (ActiveMQ-client-global-threads)] [WARN ] org.apache.activemq.artemis.core.client - AMQ212037: Connection failure to /broker1:61616 has been detected: AMQ219015: The connection was disconnected because of server shutdown [code=DISC
ONNECTED]
 

Это исключение, которое я получаю перед попыткой повторной отправки моего сообщения

 [http-nio-8080-exec-1] [INFO ]  Uncategorized exception occurred during JMS processing; nested exception is javax.jms.JMSException: AMQ219016: Connection failure detected. 
Unblocking a blocking call that will never get a response
 

Теперь, хотя в некоторых тестах я смог отправить свое сообщение, были случаи, когда отправка моего сообщения завершалась неудачно, за исключением приведенного ниже

 http-nio-8080-exec-1] [INFO ] 
javax.jms.IllegalStateException: AMQ219018: Producer is closed
        at org.apache.activemq.artemis.core.client.impl.ClientProducerImpl.checkClosed(ClientProducerImpl.java:301)
        at org.apache.activemq.artemis.core.client.impl.ClientProducerImpl.send(ClientProducerImpl.java:123)
        at org.apache.activemq.artemis.jms.client.ActiveMQMessageProducer.doSendx(ActiveMQMessageProducer.java:483)
        at org.apache.activemq.artemis.jms.client.ActiveMQMessageProducer.send(ActiveMQMessageProducer.java:220)
        at org.messaginghub.pooled.jms.JmsPoolMessageProducer.sendMessage(JmsPoolMessageProducer.java:194)
        at org.messaginghub.pooled.jms.JmsPoolMessageProducer.send(JmsPoolMessageProducer.java:88)
        at org.messaginghub.pooled.jms.JmsPoolMessageProducer.send(JmsPoolMessageProducer.java:77)
...
Caused by: ActiveMQObjectClosedException[errorType=OBJECT_CLOSED message=AMQ219018: Producer is closed]
        ... 108 more
24-01-2021 16:07:27[http-nio-8080-exec-1] [WARN ] o.messaginghub.pooled.jms.JmsPoolSession - Caught exception trying close() when putting session back into the pool, will invalidate. javax.jms.IllegalStateException: Session is closed
javax.jms.IllegalStateException: Session is closed
 

Моя главная проблема — найти способ не потерять ни одного сообщения во время процесса сбоя. Можете ли вы указать мне, что я делаю не так и как я мог бы лучше справиться с этим случаем?

Комментарии:

1. Похоже, что вы на самом деле не запускаете переход на другой ресурс у брокера. Как вы убиваете живого брокера, чтобы инициировать переход на другой ресурс? Вы останавливаете это изящно или используете что-то вроде kill -9 <pid> ? Кроме того, пожалуйста, вставьте свой <ha-policy> от каждого брокера в пару HA.

2. Я отредактировал свой ответ, чтобы включить дополнительную информацию о конфигурации HA. Также в связи с остановкой брокера я останавливаю контейнер docker с помощью docker stop (который сначала останавливает запущенный контейнер, отправляя сигнал SIGTERM, а по истечении времени ожидания — сигнал SIGKILL)

3. Правильно ли брокеры находят друг друга и формируют пару HA? Можете ли вы вставить журналы запуска из каждого?

4. Спасибо, я обновил вопрос журналами после перезапуска главного сервера. На главном сервере есть несколько предупреждений, которые могут представлять интерес.