spring rabbitmq — использует несколько сообщений одновременно

#java #spring-boot #rabbitmq #spring-amqp #spring-rabbit

#java #spring-boot #rabbitmq #spring-amqp #spring-rabbit

Вопрос:

Я использую RabbitMQ в своем приложении spring boot таким образом:

Отправитель:

 rabbitTemplate.convertAndSend("exchange", "routingKey", "Message Text");
 

Прослушиватель:

 @RabbitListener(queues = "queueName")
public void receive(String message) {
    System.out.println("start");  
    //send an http request that takes for example 4 seconds 
    System.out.println("end");  
}
 

С помощью приведенных выше кодов, когда приложение выполняет часть отправителя, receive вызывается метод. Моя проблема в том, что пока receive метод обрабатывает сообщение, если часть отправителя помещает другое сообщение в очередь, метод не обрабатывает новое сообщение, и поэтому второе start слово не будет напечатано до end слова предыдущего сообщения. Другими словами, я хочу знать, как прослушиватель сообщений может обрабатывать несколько сообщений одновременно, я не знаю, в чем проблема.

Ответ №1:

Из проблемы, которую вы указываете, похоже, что ваш слушатель настроен на один поток. Обратитесь к документам по настройке прослушивателя контейнеров здесь и здесь, особенно к настройкам параллелизма. Параметры параллелизма определяют, сколько потоков одновременно обрабатывают сообщения в очереди.

Комментарии:

1. Спасибо за ваши ответы. Я нашел setConcurrentConsumers() метод. Но этот метод создает статическое количество потоков. Есть ли какой-нибудь способ сделать это динамически? Я имею в виду, что spring обрабатывает это и добавляет поток по требованию .

2. @hamed Вы должны стремиться обрабатывать это статически. По умолчанию ваша JVM запускает только столько доступных потоков. Когда вы входите в область параллелизма, вы начинаете осознавать эти компоненты, связанные с JVM.

3. Вы можете предоставить «maxConcurrentConsumers» для обеспечения верхней границы, а контейнер слушателя динамически регулирует количество потоков в зависимости от нагрузки. Также у вас есть варианты предоставить своего собственного исполнителя. В документах очень хорошо объясняется алгоритм динамического увеличения / уменьшения потребителя. Однако вы должны иметь в виду вычислительную мощность, доступную потребителям, и вид работы, которую выполняют потребители — привязанный к процессору или связанный с вводом-выводом. Как правило, если ваша рабочая нагрузка связана с вводом-выводом, вы можете предоставить больше потоков, чем количество процессоров. Неограниченный номер потребителя в любом случае не является хорошей идеей.

Ответ №2:

Если вы используете spring boot, просто добавьте эту конфигурацию в свойства приложения:

 # Minimum number of listener invoker threads
spring.rabbitmq.listener.simple.concurrency=5
 

И ваш слушатель начнет принимать сообщения параллельно (несколько потоков). Существуют и другие конфигурации, которые вы также можете проверить. Например, максимальное количество потоков вызывающего прослушивателя (см. Документ spring boot для получения дополнительной информации).