Установить максимальное количество пользователей jms solace

#java #spring #jms #solace

#java #весна #jms #утешение

Вопрос:

Я пытаюсь установить максимальное количество пользователей конечной точки темы с помощью jms, используя solace в качестве посредника, поэтому для увеличения нагрузки в cloudfoundry можно запускать несколько экземпляров приложения, и несколько подписчиков могут получать сообщения одной и той же темы.

Я пробовал несколько комбинаций приведенных ниже настроек ( setConcurrency(), setConcurrentConsumers(), setMaxConcurrentConsumers() , (20 как произвольное большое число). Судя по документации, мне определенно нужно использовать setMaxConcurrentConsumers() и установить для этого достаточно высокое значение.

Когда я развертываю приложение, создается конечная точка темы, но когда я смотрю на интерфейс управления solace, максимальное количество пользователей всегда равно 1 (как видно здесь: Queues -> Topic Endpoints -> select endpoint -> Configured Limit ), хотя оно должно быть 20. Таким образом, второй потребитель не может подключиться. Я не хочу устанавливать это вручную каждый раз при развертывании приложения.

Скриншот моего интерфейса управления solace

 import javax.jms.*;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.listener.DefaultMessageListenerContainer;

@Configuration
public class ProducerConfiguration {

    private static final Log logger = LogFactory.getLog(SolaceController.class);

    @Value("${durable_subscription}")
    private String subscriptionName;

    @Value("${topic_name}")
    private String topic_name;

    @Autowired
    private ConnectionFactory connectionFactory;

    @Bean
    public JmsTemplate jmsTemplate() {
        CachingConnectionFactory ccf = new CachingConnectionFactory(connectionFactory);
        JmsTemplate jmst = new JmsTemplate(ccf);
        jmst.setPubSubDomain(true);
        return jmst;
    }

    @Bean
    public Session configureSession(ConnectionFactory connectionFactory) throws JMSException {
        return connectionFactory.createConnection().createSession(false, Session.AUTO_ACKNOWLEDGE);
    }


    private TextMessage lastReceivedMessage;

    public class SimpleMessageListener implements MessageListener {
        @Override
        public void onMessage(Message message) {

            if (message instanceof TextMessage) {
                lastReceivedMessage = (TextMessage) message;
                try {
                    logger.info("Received message : "   lastReceivedMessage.getText());
                } catch (JMSException e) {
                    logger.error("Error getting text of the received TextMessage: "   e);
                }
            } else {
                logger.error("Received message that was not a TextMessage: "   message);
            }
        }
    }

    @Bean
    public DefaultMessageListenerContainer orderMessageListenerContainer() {

        DefaultMessageListenerContainer lc = new DefaultMessageListenerContainer();
        lc.setConnectionFactory(connectionFactory);
        lc.setDestinationName(topic_name);
        lc.setMessageListener(new SimpleMessageListener());
        lc.setDurableSubscriptionName(subscriptionName);
        lc.setPubSubDomain(true);

        //tried multiple combinations here, also setting only setMaxConcurrentConsumers
        lc.setConcurrency("2-20");
        lc.setConcurrentConsumers(20);
        lc.setMaxConcurrentConsumers(20);

        lc.setSubscriptionDurable(true);
        lc.initialize();
        lc.start();
        return lc;
    }
}
  

Комментарии:

1. изменится ли это, если вы закомментируете setSubscriptionDurable(true) ? Может быть , вы всегда используете одно и то же имя подписки , и Solace это не нравится ?

2. Только что попробовал это. Создаются четыре конечные точки темы (для одной и той же темы) в случае двух экземпляров приложения, что для меня нормально. Также в выводе журнала отсутствуют ошибки, связанные с тем, что абоненты не могут подключиться. Все они, конечно, недолговечные, но мне нужны долгосрочные подписки, обойти это абсолютно невозможно. Ни одно сообщение не должно быть потеряно.

Ответ №1:

Я думаю, что для вашего варианта использования ваш потребитель застрял в очередях. Видишь https://solace.com/blog/topic-subscription-queues /

«… в то время как несколько потребителей могут привязываться к очередям, долгосрочные конечные точки ограничены подпиской на одну тему. Очереди разрешают несколько подписок на темы, а также шаблоны тем.»

Если вы не хотите менять своего издателя, вы можете попробовать «Подписка на темы в очередях». То есть очередь может быть настроена для прослушивания темы. И тогда ваши потребители будут получать сообщения из этой очереди.

Комментарии:

1. Итак, покопавшись, я выяснил, что это функция, специфичная для solace, поэтому для подписки на очередь к теме необходим сеанс, специфичный для solace … но я стараюсь быть максимально независимым от брокера, чтобы при необходимости я мог легко переключить его позже. К сожалению, я не включил это требование в вопрос.

2. да, хорошая идея быть независимым от брокера. В конце концов, это JMS. Итак, как насчет использования очередей?

3. Но что, если у меня есть производитель сообщений, который публикует какое-то сообщение и не хочет заботиться о том, в какую очередь его точно поместить? Он просто хочет опубликовать это, объявить, о чем это (тема), и тот, кто подписывается на эту тему, может ее захватить. Я говорю об основании довольно сложной мультисервисной системы. Я не вижу очередей, подходящих для этого варианта использования. Или вы?

4. ну, тогда как насчет использования недолговечных тем и транзакций? Кроме того, при транзакционных публикациях и потреблениях вы не рискуете потерять сообщения.

5. Хорошо, спасибо за совет. Я пробовал это, в итоге получаю две конечные точки темы, если у меня есть два экземпляра, и сообщение будет отправлено в оба экземпляра, что не то, что я хочу, мне нужен только один экземпляр для получения сообщения.

Ответ №2:

Вам необходимо создать неисключительную очередь / конечную точку.

По умолчанию создаваемая вами очередь является эксклюзивными очередями / конечными точками, что означает, что только один подписчик может привязаться к ней в любое время.

Самый простой способ создать такую очередь / конечную точку — через интерфейс командной строки Solace.

Чтобы создать неисключительную очередь в вашей программе JMS, вы должны перейти к конкретной реализации Solace JMS, подобной этой:

     if (queueName != null) {
        EndpointProperties props = new EndpointProperties();
        props.setAccessType(EndpointProperties.ACCESSTYPE_NONEXCLUSIVE);

        try {
            ((SolConnection)connection).getProperties().getJCSMPSession()
                .provision(JCSMPFactory.onlyInstance().createQueue(queueName), props, 0L);
        } catch (Exception e) {
            e.printStackTrace();
        }

        queue = session.createQueue(queueName);
    }
  

Комментарии:

1. Спасибо за ваш ответ. Одно очень важное требование, как также указано в вопросе, заключается в том, что все должно происходить программно, поэтому не следует настраивать CLI.

2. Я создал неисключительную очередь, зайдя внутрь JCSMP… Я обновлю ответ выше.

3. Привет, Дэвид, большое тебе спасибо за то, что вернулся! Не могли бы вы предоставить полностью рабочий пример кода, который вы использовали для запуска этого фрагмента? И более конкретно: как подключить эту вновь созданную очередь к теме, которую я создал с помощью кода в моем вопросе. Также я не уверен, что connection вы ссылаетесь на SolConnection в строке 6?