Повторная доставка ActiveMQ 5 не работает после отключения от сети

#java #activemq #spring-jms

#java #activemq #spring-jms

Вопрос:

Фреймворки:

  • Java 1.7.0_191 и 1.8.0_181
  • Весна 4.3.18.ВЫПУСК
  • ActiveMQ 5.14.5

Сценарий:

Два клиента и сервер. Клиент 1 теряет соединение (из-за отключения от сети). Сообщение отправляется и потребляется клиентом 2. Клиент 1 восстанавливает сетевое соединение. Теперь я ожидал повторной доставки сообщения клиенту 1. Хотя клиент 1 теперь работает нормально (получает все новые сообщения), существует пробел, из-за которого клиент 1 не получил сообщения об обновлении, поэтому клиенту больше нельзя доверять.

Это специально или я что-то неправильно настроил?

ServerBroker:

 final String brokerURI = String.format("broker://(tcp://%s:%s)?brokerName=clientBroker", host, port);
final BrokerService brokerService = BrokerFactory.createBroker(brokerURI);
brokerService.setUseJmx(true);
brokerService.setDataDirectory(dataDirectory);
final LoggingBrokerPlugin loggingBrokerPlugin = new LoggingBrokerPlugin();
loggingBrokerPlugin.setLogConsumerEvents(true);
loggingBrokerPlugin.setLogProducerEvents(true);
brokerService.setPlugins(new BrokerPlugin[] { loggingBrokerPlugin });
brokerService.start();
  

ServerProducer:

 final JmsTemplate jmsTemplate = new JmsTemplate();
jmsTemplate.setPubSubDomain(true);
jmsTemplate.setExplicitQosEnabled(true);
jmsTemplate.setDeliveryMode(DeliveryMode.PERSISTENT);
jmsTemplate.setTimeToLive(600_000L);
final ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
activeMQConnectionFactory.setTrustedPackages(ImmutableList.of("my.package", "java"));
activeMQConnectionFactory.setBrokerURL(String.format("tcp://%s:%s", host, port));
jmsTemplate.setConnectionFactory(new PooledConnectionFactory(activeMQConnectionFactory));
jmsTemplate.convertAndSend(JmsQueueConstants.SERVER_UPDATE, new NotificationQueueEntry());
  

Клиент-потребитель:

 @JmsListener(destination = JmsQueueConstants.SERVER_UPDATE)
public void receive(final NotificationQueueEntry msg) {
    process(msg);
}

@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
    final DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setConnectionFactory(jmsConnectionFactory());
    factory.setPubSubDomain(true);
    factory.setSessionTransacted(true);
    factory.setCacheLevel(DefaultMessageListenerContainer.CACHE_CONSUMER);
    factory.setConcurrency("2");
    return factory;
}

private ConnectionFactory jmsConnectionFactory() {
    final ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
    activeMQConnectionFactory.setTrustedPackages(ImmutableList.of("my.package", "java"));
    activeMQConnectionFactory.setBrokerURL(String.format("failover:(tcp://%s:%s)?jms.closeTimeout=%d", host, port, 600_000));
    final RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
    redeliveryPolicy.setInitialRedeliveryDelay(10_000L);
    redeliveryPolicy.setRedeliveryDelay(1_000L);
    redeliveryPolicy.setMaximumRedeliveries(600);
    activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
    activeMQConnectionFactory.setTransportListener(new TransportListener() {
        
        @Override
        public void onCommand(final Object command) {
        }
        
        @Override
        public void onException(final IOException error) {
            connected = false;
        }
        
        @Override
        public void transportInterupted() {
            connected = false;
        }
        
        @Override
        public void transportResumed() {
            if (!connected) {
                reconnect();
            }
        }
        
    });
    return new PooledConnectionFactory(activeMQConnectionFactory);
}
  

подключено и переподключение() предназначены только для наглядного отображения отключенного состояния в клиенте, без активного повторного подключения к ActiveMQ.

Журнал:

 2020-09-09 11:39:52,415 [ActiveMQ Transport: tcp:///<client-1-ip>:54049@1079] DEBUG o.a.a.b.T.Transport:241 Transport Connection to: tcp://<client-1-ip>:54049 failed: java.net.SocketException: Connection reset
2020-09-09 11:39:52,416 [ActiveMQ BrokerService[clientBroker] Task-17] DEBUG o.a.a.t.tcp.TcpTransport:549 Stopping transport tcp:///<client-1-ip>:54049@1079
2020-09-09 11:39:52,417 [ActiveMQ BrokerService[clientBroker] Task-17] DEBUG o.a.a.b.TransportConnection:1233 Cleaning up connection resources: tcp://<client-1-ip>:54049
2020-09-09 11:39:56,339 [RMI TCP Connection(310)-<client-2-ip>] INFO m.p.s.DatabaseServiceImplementation:98 saving entity to database
2020-09-09 11:39:56,342 [server-pool-1-thread-16] INFO m.p.c.JmsConnectionFactoryCache:73 Create connection for address: client-2:1079
2020-09-09 11:39:56,343 [server-pool-1-thread-16] INFO m.p.s.ServerProducerImplementation:268 Send message to client-2:1079. Data: NotificationQueueEntry
2020-09-09 11:39:56,343 [ActiveMQ Transport: tcp:///<client-2-ip>:63125@1079] INFO o.a.a.b.u.LoggingBrokerPlugin:257 Removing Producer: ProducerInfo {commandId = 4, responseRequired = true, producerId = ID:client-2-61550-1599640739463-4:5:1:1, destination = null, brokerPath = null, dispatchAsync = false, windowSize = 0, sentCount = 1}
2020-09-09 11:39:56,345 [ActiveMQ BrokerService[clientBroker] Task-17] DEBUG o.a.a.t.tcp.TcpTransport:549 Stopping transport tcp:///<client-2-ip>:63125@1079
2020-09-09 11:39:56,345 [server-pool-1-thread-16] DEBUG o.a.a.util.ThreadPoolUtils:155 Shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@f78a38[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0] is shutdown: true and terminated: true took: 0.000 seconds.
2020-09-09 11:39:56,346 [server-pool-1-thread-16] DEBUG o.a.a.t.tcp.TcpTransport:549 Stopping transport tcp://client-2/<client-2-ip>:1079@63125
2020-09-09 11:39:56,348 [server-pool-1-thread-16] DEBUG o.a.a.t.WireFormatNegotiator:82 Sending: WireFormatInfo { version=12, properties={StackTraceEnabled=true, PlatformDetails=JVM: 1.8.0_181, 25.181-b13, Oracle Corporation, OS: Windows 10, 10.0, x86, CacheEnabled=true, Host=client-2, TcpNoDelayEnabled=true, SizePrefixDisabled=false, CacheSize=1024, ProviderName=ActiveMQ, TightEncodingEnabled=true, MaxFrameSize=9223372036854775807, MaxInactivityDuration=30000, MaxInactivityDurationInitalDelay=10000, ProviderVersion=5.14.5}, magic=[A,c,t,i,v,e,M,Q]}
2020-09-09 11:39:56,350 [ActiveMQ Transport: tcp:///<client-2-ip>:52652@1079] DEBUG o.a.a.t.InactivityMonitor:103 Using min of local: WireFormatInfo { version=12, properties={TcpNoDelayEnabled=true, SizePrefixDisabled=false, CacheSize=1024, ProviderName=ActiveMQ, StackTraceEnabled=true, PlatformDetails=JVM: 1.8.0_181, 25.181-b13, Oracle Corporation, OS: Windows 10, 10.0, x86, CacheEnabled=true, TightEncodingEnabled=true, MaxFrameSize=9223372036854775807, MaxInactivityDuration=30000, MaxInactivityDurationInitalDelay=10000, ProviderVersion=5.14.5}, magic=[A,c,t,i,v,e,M,Q]} and remote: WireFormatInfo { version=12, properties={StackTraceEnabled=true, PlatformDetails=JVM: 1.8.0_181, 25.181-b13, Oracle Corporation, OS: Windows 10, 10.0, x86, CacheEnabled=true, Host=client-2, TcpNoDelayEnabled=true, SizePrefixDisabled=false, CacheSize=1024, ProviderName=ActiveMQ, TightEncodingEnabled=true, MaxFrameSize=9223372036854775807, MaxInactivityDuration=30000, MaxInactivityDurationInitalDelay=10000, ProviderVersion=5.14.5}, magic=[A,c,t,i,v,e,M,Q]}
2020-09-09 11:39:56,350 [ActiveMQ Transport: tcp://client-2/<client-2-ip>:1079@52652] DEBUG o.a.a.t.InactivityMonitor:103 Using min of local: WireFormatInfo { version=12, properties={StackTraceEnabled=true, PlatformDetails=JVM: 1.8.0_181, 25.181-b13, Oracle Corporation, OS: Windows 10, 10.0, x86, CacheEnabled=true, Host=client-2, TcpNoDelayEnabled=true, SizePrefixDisabled=false, CacheSize=1024, ProviderName=ActiveMQ, TightEncodingEnabled=true, MaxFrameSize=9223372036854775807, MaxInactivityDuration=30000, MaxInactivityDurationInitalDelay=10000, ProviderVersion=5.14.5}, magic=[A,c,t,i,v,e,M,Q]} and remote: WireFormatInfo { version=12, properties={TcpNoDelayEnabled=true, SizePrefixDisabled=false, CacheSize=1024, ProviderName=ActiveMQ, StackTraceEnabled=true, PlatformDetails=JVM: 1.8.0_181, 25.181-b13, Oracle Corporation, OS: Windows 10, 10.0, x86, CacheEnabled=true, TightEncodingEnabled=true, MaxFrameSize=9223372036854775807, MaxInactivityDuration=30000, MaxInactivityDurationInitalDelay=10000, ProviderVersion=5.14.5}, magic=[A,c,t,i,v,e,M,Q]}
2020-09-09 11:39:56,351 [ActiveMQ Transport: tcp://client-2/<client-2-ip>:1079@52652] DEBUG o.a.a.t.WireFormatNegotiator:130 Received WireFormat: WireFormatInfo { version=12, properties={TcpNoDelayEnabled=true, SizePrefixDisabled=false, CacheSize=1024, ProviderName=ActiveMQ, StackTraceEnabled=true, PlatformDetails=JVM: 1.8.0_181, 25.181-b13, Oracle Corporation, OS: Windows 10, 10.0, x86, CacheEnabled=true, TightEncodingEnabled=true, MaxFrameSize=9223372036854775807, MaxInactivityDuration=30000, MaxInactivityDurationInitalDelay=10000, ProviderVersion=5.14.5}, magic=[A,c,t,i,v,e,M,Q]}
2020-09-09 11:39:56,351 [ActiveMQ Transport: tcp:///<client-2-ip>:52652@1079] DEBUG o.a.a.t.WireFormatNegotiator:130 Received WireFormat: WireFormatInfo { version=12, properties={StackTraceEnabled=true, PlatformDetails=JVM: 1.8.0_181, 25.181-b13, Oracle Corporation, OS: Windows 10, 10.0, x86, CacheEnabled=true, Host=client-2, TcpNoDelayEnabled=true, SizePrefixDisabled=false, CacheSize=1024, ProviderName=ActiveMQ, TightEncodingEnabled=true, MaxFrameSize=9223372036854775807, MaxInactivityDuration=30000, MaxInactivityDurationInitalDelay=10000, ProviderVersion=5.14.5}, magic=[A,c,t,i,v,e,M,Q]}
2020-09-09 11:39:56,351 [ActiveMQ Transport: tcp://client-2/<client-2-ip>:1079@52652] DEBUG o.a.a.t.WireFormatNegotiator:137 tcp://client-2/<client-2-ip>:1079@52652 before negotiation: OpenWireFormat{version=12, cacheEnabled=false, stackTraceEnabled=false, tightEncodingEnabled=false, sizePrefixDisabled=false, maxFrameSize=9223372036854775807}
2020-09-09 11:39:56,351 [ActiveMQ Transport: tcp:///<client-2-ip>:52652@1079] DEBUG o.a.a.t.WireFormatNegotiator:137 tcp:///<client-2-ip>:52652@1079 before negotiation: OpenWireFormat{version=12, cacheEnabled=false, stackTraceEnabled=false, tightEncodingEnabled=false, sizePrefixDisabled=false, maxFrameSize=9223372036854775807}
2020-09-09 11:39:56,351 [ActiveMQ Transport: tcp://client-2/<client-2-ip>:1079@52652] DEBUG o.a.a.t.WireFormatNegotiator:152 tcp://client-2/<client-2-ip>:1079@52652 after negotiation: OpenWireFormat{version=12, cacheEnabled=true, stackTraceEnabled=true, tightEncodingEnabled=true, sizePrefixDisabled=false, maxFrameSize=9223372036854775807}
2020-09-09 11:39:56,351 [ActiveMQ Transport: tcp:///<client-2-ip>:52652@1079] DEBUG o.a.a.t.WireFormatNegotiator:152 tcp:///<client-2-ip>:52652@1079 after negotiation: OpenWireFormat{version=12, cacheEnabled=true, stackTraceEnabled=true, tightEncodingEnabled=true, sizePrefixDisabled=false, maxFrameSize=9223372036854775807}
2020-09-09 11:39:56,354 [ActiveMQ Transport: tcp:///<client-2-ip>:52652@1079] INFO o.a.a.b.u.LoggingBrokerPlugin:192 Adding Producer: ProducerInfo {commandId = 4, responseRequired = true, producerId = ID:client-2-61550-1599640739463-4:6:1:1, destination = null, brokerPath = null, dispatchAsync = false, windowSize = 0, sentCount = 0}
2020-09-09 11:39:56,357 [ActiveMQ Transport: tcp:///<client-2-ip>:52652@1079] INFO o.a.a.b.u.LoggingBrokerPlugin:285 Sending message: ActiveMQObjectMessage {commandId = 5, responseRequired = true, messageId = ID:client-2-61550-1599640739463-4:6:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:client-2-61550-1599640739463-4:6:1:1, destination = topic://ServerUpdate, transactionId = null, expiration = 1599644996357, timestamp = 1599644396357, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@11eb3c5, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false}
2020-09-09 11:39:56,358 [ActiveMQ BrokerService[clientBroker] Task-18] INFO o.a.a.b.u.LoggingBrokerPlugin:428 preProcessDispatch: MessageDispatch {commandId = 0, responseRequired = false, consumerId = ID:client-2-52500-1599644321472-1:1:1:1, destination = topic://ServerUpdate, message = ActiveMQObjectMessage {commandId = 5, responseRequired = true, messageId = ID:client-2-61550-1599640739463-4:6:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:client-2-61550-1599640739463-4:6:1:1, destination = topic://ServerUpdate, transactionId = null, expiration = 1599644996357, timestamp = 1599644396357, arrival = 0, brokerInTime = 1599644396358, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@162a6d1, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 1936, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false}, redeliveryCounter = 0}
2020-09-09 11:39:56,359 [ActiveMQ BrokerService[clientBroker] Task-18] INFO o.a.a.b.u.LoggingBrokerPlugin:436 postProcessDispatch: MessageDispatch {commandId = 0, responseRequired = false, consumerId = ID:client-2-52500-1599644321472-1:1:1:1, destination = topic://ServerUpdate, message = ActiveMQObjectMessage {commandId = 5, responseRequired = true, messageId = ID:client-2-61550-1599640739463-4:6:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:client-2-61550-1599640739463-4:6:1:1, destination = topic://ServerUpdate, transactionId = null, expiration = 1599644996357, timestamp = 1599644396357, arrival = 0, brokerInTime = 1599644396358, brokerOutTime = 1599644396359, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@162a6d1, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 1936, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false}, redeliveryCounter = 0}
2020-09-09 11:39:56,419 [ActiveMQ Transport: tcp:///<client-2-ip>:52501@1079] INFO o.a.a.b.u.LoggingBrokerPlugin:157 Acknowledging message for client ID: ID:client-2-52500-1599644321472-0:1, ID:client-2-61550-1599640739463-4:6:1:1:1
2020-09-09 11:39:56,421 [ActiveMQ Transport: tcp:///<client-2-ip>:52501@1079] DEBUG o.a.a.t.LocalTransaction:48 commit: TX:ID:client-2-52500-1599644321472-1:1:1 syncCount: 1
2020-09-09 11:41:04,215 [ActiveMQ Transport: tcp:///<client-1-ip>:50446@1079] DEBUG o.a.a.t.InactivityMonitor:103 Using min of local: WireFormatInfo { version=12, properties={TcpNoDelayEnabled=true, SizePrefixDisabled=false, CacheSize=1024, ProviderName=ActiveMQ, StackTraceEnabled=true, PlatformDetails=JVM: 1.8.0_181, 25.181-b13, Oracle Corporation, OS: Windows 10, 10.0, x86, CacheEnabled=true, TightEncodingEnabled=true, MaxFrameSize=9223372036854775807, MaxInactivityDuration=30000, MaxInactivityDurationInitalDelay=10000, ProviderVersion=5.14.5}, magic=[A,c,t,i,v,e,M,Q]} and remote: WireFormatInfo { version=12, properties={CacheSize=1024, ProviderName=ActiveMQ, SizePrefixDisabled=false, TcpNoDelayEnabled=true, PlatformDetails=JVM: 1.7.0_191, 24.191-b08, Oracle Corporation, OS: Windows 10, 10.0, x86, StackTraceEnabled=true, CacheEnabled=true, MaxFrameSize=9223372036854775807, TightEncodingEnabled=true, MaxInactivityDuration=30000, ProviderVersion=3.2.0.0-SNAPSHOT, MaxInactivityDurationInitalDelay=10000}, magic=[A,c,t,i,v,e,M,Q]}
2020-09-09 11:41:04,215 [ActiveMQ Transport: tcp:///<client-1-ip>:50446@1079] DEBUG o.a.a.t.WireFormatNegotiator:130 Received WireFormat: WireFormatInfo { version=12, properties={CacheSize=1024, ProviderName=ActiveMQ, SizePrefixDisabled=false, TcpNoDelayEnabled=true, PlatformDetails=JVM: 1.7.0_191, 24.191-b08, Oracle Corporation, OS: Windows 10, 10.0, x86, StackTraceEnabled=true, CacheEnabled=true, MaxFrameSize=9223372036854775807, TightEncodingEnabled=true, MaxInactivityDuration=30000, ProviderVersion=3.2.0.0-SNAPSHOT, MaxInactivityDurationInitalDelay=10000}, magic=[A,c,t,i,v,e,M,Q]}
2020-09-09 11:41:04,215 [ActiveMQ Transport: tcp:///<client-1-ip>:50446@1079] DEBUG o.a.a.t.WireFormatNegotiator:137 tcp:///<client-1-ip>:50446@1079 before negotiation: OpenWireFormat{version=12, cacheEnabled=false, stackTraceEnabled=false, tightEncodingEnabled=false, sizePrefixDisabled=false, maxFrameSize=9223372036854775807}
2020-09-09 11:41:04,216 [ActiveMQ Transport: tcp:///<client-1-ip>:50446@1079] DEBUG o.a.a.t.WireFormatNegotiator:152 tcp:///<client-1-ip>:50446@1079 after negotiation: OpenWireFormat{version=12, cacheEnabled=true, stackTraceEnabled=true, tightEncodingEnabled=true, sizePrefixDisabled=false, maxFrameSize=9223372036854775807}
  

Ответ №1:

Это ожидаемое поведение для подписчика темы с длительным сроком действия. Когда такой подписчик не подключен, он не будет получать никаких сообщений, отправленных брокеру.

Также стоит отметить, что это не то, что обычно называют «повторной доставкой». Повторная доставка сообщения — это то, что происходит, когда, например, сообщение используется в транзакции, и эта транзакция откатывается, а затем сообщение повторно доставляется клиенту для повторной попытки.

Комментарии:

1. Кажется, я неправильно понял «повторную доставку». Использование постоянных подписчиков сделало свое дело.