Ожидание сообщений в очереди jms даже после того, как слушатель работает нормально?

#java #spring-mvc #activemq

#java #spring-mvc #activemq

Вопрос:

Я разработал два веб-приложения в качестве производителя и потребителя для своего проекта. Все работает нормально, за исключением того, что некоторые сообщения находятся в состоянии ожидания (я вижу в браузере / пользовательском интерфейсе) даже после того, как мой слушатель использует их все.

Пример: — Производитель отправил 10 тыс., потребитель получил 10 тыс., но я все еще вижу около ~ 1 тыс. сообщений, оставшихся в состоянии ожидания. И если я нажму опцию очистки в пользовательском интерфейсе, мой слушатель снова прослушает эти ожидающие сообщения.

Сокращение политики предварительной выборки уменьшает ожидающие сообщения, но влияет на производительность моего потребителя.

Код производителя :

     @Bean
    public ActiveMQConnectionFactory connectionFactory(){
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
        connectionFactory.setBrokerURL(env.getProperty(JioTUConstant.SYS_JMS_BROKER_URL));
        connectionFactory.setUseAsyncSend(new Boolean(env.getProperty(JioTUConstant.SYS_JMS_USE_ASYNC_SEND)));
        connectionFactory.setAlwaysSessionAsync(new Boolean(env.getProperty(JioTUConstant.SYS_JMS_ALWAYS_SESSION_ASYNC)));
        connectionFactory.setOptimizeAcknowledge(new Boolean(env.getProperty(JioTUConstant.SYS_JMS_OPTIMIZE_ACK)));
        connectionFactory.setUserName(env.getProperty(JioTUConstant.SYS_JMS_USERNAME));
        connectionFactory.setPassword(env.getProperty(JioTUConstant.SYS_JMS_PASSWORD));
        return connectionFactory;
    }

    @Bean
    public CachingConnectionFactory cachingConnectionFactory(){
        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
        cachingConnectionFactory.setTargetConnectionFactory(connectionFactory());
        cachingConnectionFactory.setSessionCacheSize(new Integer(env.getProperty(JioTUConstant.SYS_JMS_SESSION_CACHE_SIZE)));
        cachingConnectionFactory.setReconnectOnException(new Boolean(env.getProperty(JioTUConstant.SYS_JMS_RECONNECT_ON_EXCEPTION)));
        return cachingConnectionFactory;
    }

    @Bean
    public JmsTemplate jmsTemplate(){
        JmsTemplate jmsTemplate = new JmsTemplate();
        jmsTemplate.setDeliveryMode(new Integer(env.getProperty(JioTUConstant.SYS_JMS_DELIVERY_MODE)));//DeliveryMode.NON_PERSISTENT=1 and DeliveryMode.PERSISTENT=2
        jmsTemplate.setSessionAcknowledgeMode(new Integer(env.getProperty(JioTUConstant.SYS_JMS_SESSION_ACK_MODE)));//Session.AUTO_ACKNOWLEDGE=1,Session.CLIENT_ACKNOWLEDGE=2,Session.DUPS_OK_ACKNOWLEDGE=3 and Session.SESSION_TRANSACTED=0
        jmsTemplate.setSessionAcknowledgeModeName(env.getProperty(JioTUConstant.SYS_JMS_SESSION_ACK_MODE_NAME));
        jmsTemplate.setSessionTransacted(new Boolean(env.getProperty(JioTUConstant.SYS_JMS_SESSION_TRANSACTED)));
        jmsTemplate.setDefaultDestinationName(env.getProperty(JioTUConstant.SYS_JMS_LOG_QUEUE_NAME));
        jmsTemplate.setConnectionFactory(cachingConnectionFactory());
        return jmsTemplate;
    }
 

Потребительский код:

     @Bean
    public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConcurrency(env.getProperty(JioTUConstant.SYS_JMS_CONCURRENT_LISTENER));
        factory.setConnectionFactory(cachingConnectionFactory());
        return factory;
    }

    @Bean
    public ActiveMQConnectionFactory connectionFactory(){
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
        connectionFactory.setBrokerURL(env.getProperty(JioTUConstant.SYS_JMS_BROKER_URL));
        connectionFactory.setAlwaysSessionAsync(new Boolean(env.getProperty(JioTUConstant.SYS_JMS_ALWAYS_SESSION_ASYNC)));
        connectionFactory.setOptimizeAcknowledge(new Boolean(env.getProperty(JioTUConstant.SYS_JMS_OPTIMIZE_ACK)));
        connectionFactory.setUserName(env.getProperty(JioTUConstant.SYS_JMS_USERNAME));
        connectionFactory.setPassword(env.getProperty(JioTUConstant.SYS_JMS_PASSWORD));
        return connectionFactory;
    }

    @Bean
    public CachingConnectionFactory cachingConnectionFactory(){
        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
        cachingConnectionFactory.setSessionCacheSize(new Integer(env.getProperty(JioTUConstant.SYS_JMS_SESSION_CACHE_SIZE)));
        cachingConnectionFactory.setReconnectOnException(new Boolean(env.getProperty(JioTUConstant.SYS_JMS_RECONNECT_ON_EXCEPTION)));
        cachingConnectionFactory.setTargetConnectionFactory(connectionFactory());
        return cachingConnectionFactory;
    }
 

ActiveMQ 5.14.0

Комментарии:

1. ребята, пожалуйста ….. у меня настоящие проблемы ….: P

2. если оно остается без ответа, мне, возможно, придется переключиться на очередь tibco n я все равно не могу найти, чтобы создать CachingConnectionFactory для tibco или аналогичного кода. Поэтому, пожалуйста, пожалуйста.

3. Такая же проблема наблюдается в версии ActiveMQ 5.13.0. Итак, теперь я сомневаюсь в себе. Возможно, я пропустил некоторые основные.