#java #spring-boot #rabbitmq #spring-amqp #spring-rabbit
#java #spring-boot #rabbitmq #spring-amqp #spring-rabbit
Вопрос:
Я использую RabbitMQ в своем приложении spring boot таким образом:
Отправитель:
rabbitTemplate.convertAndSend("exchange", "routingKey", "Message Text");
Прослушиватель:
@RabbitListener(queues = "queueName")
public void receive(String message) {
System.out.println("start");
//send an http request that takes for example 4 seconds
System.out.println("end");
}
С помощью приведенных выше кодов, когда приложение выполняет часть отправителя, receive
вызывается метод. Моя проблема в том, что пока receive
метод обрабатывает сообщение, если часть отправителя помещает другое сообщение в очередь, метод не обрабатывает новое сообщение, и поэтому второе start
слово не будет напечатано до end
слова предыдущего сообщения. Другими словами, я хочу знать, как прослушиватель сообщений может обрабатывать несколько сообщений одновременно, я не знаю, в чем проблема.
Ответ №1:
Из проблемы, которую вы указываете, похоже, что ваш слушатель настроен на один поток. Обратитесь к документам по настройке прослушивателя контейнеров здесь и здесь, особенно к настройкам параллелизма. Параметры параллелизма определяют, сколько потоков одновременно обрабатывают сообщения в очереди.
Комментарии:
1. Спасибо за ваши ответы. Я нашел
setConcurrentConsumers()
метод. Но этот метод создает статическое количество потоков. Есть ли какой-нибудь способ сделать это динамически? Я имею в виду, что spring обрабатывает это и добавляет поток по требованию .2. @hamed Вы должны стремиться обрабатывать это статически. По умолчанию ваша JVM запускает только столько доступных потоков. Когда вы входите в область параллелизма, вы начинаете осознавать эти компоненты, связанные с JVM.
3. Вы можете предоставить «maxConcurrentConsumers» для обеспечения верхней границы, а контейнер слушателя динамически регулирует количество потоков в зависимости от нагрузки. Также у вас есть варианты предоставить своего собственного исполнителя. В документах очень хорошо объясняется алгоритм динамического увеличения / уменьшения потребителя. Однако вы должны иметь в виду вычислительную мощность, доступную потребителям, и вид работы, которую выполняют потребители — привязанный к процессору или связанный с вводом-выводом. Как правило, если ваша рабочая нагрузка связана с вводом-выводом, вы можете предоставить больше потоков, чем количество процессоров. Неограниченный номер потребителя в любом случае не является хорошей идеей.
Ответ №2:
Если вы используете spring boot, просто добавьте эту конфигурацию в свойства приложения:
# Minimum number of listener invoker threads
spring.rabbitmq.listener.simple.concurrency=5
И ваш слушатель начнет принимать сообщения параллельно (несколько потоков). Существуют и другие конфигурации, которые вы также можете проверить. Например, максимальное количество потоков вызывающего прослушивателя (см. Документ spring boot для получения дополнительной информации).