#rabbitmq #spring-amqp
#rabbitmq #spring-amqp
Вопрос:
Есть ли способ, с помощью которого мы можем ограничить очередь RabbitMQ отправкой только фиксированного количества сообщений из очереди потребителям?
У меня есть 2 очереди Q1 и Q2 и 10 потребителей.Каждый потребитель может обрабатывать сообщения из Q1 и Q2.At в любой момент времени только 2 потребителя должны обрабатывать сообщения из Q2. Все 10 потребителей могут обрабатывать сообщения из Q1 одновременно.
Есть ли какая-либо конфигурация в RabbitMQ, которую мы можем указать, чтобы RabbitMQ отправлял только 2 сообщения из Q2 любому свободному потребителю и отправлял следующие 2 только после их подтверждения, даже если другие потребители свободны и готовы к использованию.
Дополнительная информация о проблеме:
Почему одновременно обрабатывается только 2 сообщения? : Сообщения Q2 выполняют вызов веб-службы, а конечная точка веб-службы (сторонняя организация) может обслуживать только 2 сообщения одновременно.
Нельзя ли использовать параллелизм? : Если мы используем ListenerContainer (Spring AMQP), контейнер предназначен для каждого потребителя. Мы можем ограничить количество сообщений, которые один потребитель может принимать одновременно, но когда у нас будет 10 потребителей, если в очереди есть сообщения, каждый потребитель получит свою долю.
Можем ли мы настроить только 2 потребителей, слушающих Q2? : Я понимаю, что мы можем достичь этого, настроив только 2 потребителей для Q2, но я пытаюсь избежать этого. Если по какой-либо причине эти 2 потребителя выйдут из строя, обработка Q2 будет остановлена. Если настроено 10 потребителей, мы можем гарантировать, что обработка будет выполняться до тех пор, пока не отключится последний потребитель.
Посмотрим, есть ли в RabbitMQ какая-либо конфигурация, которую мы можем использовать, или любое предлагаемое решение.
Заранее спасибо!
Комментарии:
1. Обычно потребитель не должен «перегружать» сообщения из-за какого-либо внутреннего ограничения скорости. В зависимости от вашей библиотеки вы можете просто заблокировать обратный вызов subscribe, пока не будете готовы к получению нового сообщения.
Ответ №1:
Я почти уверен, что предварительная выборка потребителя выполнит то, что вы хотите. Но у Q2 может быть только один потребитель, чтобы это работало. Нет способа координировать действия между несколькими потребителями — вам придется делать это самостоятельно, и вы могли бы использовать RabbitMQ для координации.
ПРИМЕЧАНИЕ: команда RabbitMQ отслеживает rabbitmq-users
список рассылки и лишь иногда отвечает на вопросы в StackOverflow.
Комментарии:
1. Большое спасибо, Люк, за быстрый ответ. Я могу сделать это с помощью предварительной выборки и ограничения concurrentConsumers. Но да, я должен ограничить количество потребителей, подписывающихся на этот Q.
Ответ №2:
Я думаю, вы запутываетесь в определении проблемы. То, что вам действительно нужно, тривиально, поэтому давайте немного разберем это.
Учитывая две очереди, Q1
и Q2
- 10 потребителей
- Каждый потребитель может обрабатывать сообщения из Q1 и Q2.
- В любой момент времени только 2 потребителя должны обрабатывать сообщения из Q2.
- Все 10 потребителей могут обрабатывать сообщение из Q1 одновременно.
Комментарии к постановке проблемы
Во-первых, предполагается, что очереди независимы. Независимый процесс P
будет иметь очередь Q
, таким образом, Q1
обслуживает процесс P1
. Это строгое математическое требование — вы не можете определить две очереди для одного процесса P
.
Таким образом, второе ограничение математически неверно по той же причине, по которой вы не смогли написать допустимую функцию, которая принимает параметр типа string
и bool
взаимозаменяемо. Он должен принимать один или другой, поскольку они несовместимые типы, или он должен принимать одного общего предка типов без учета подтипов. Это вариант принципа подстановки Лискова.
Переопределение проблемы
Всего в системе 12 потребителей:
Q1
имеет 10 потребителейQ2
имеет 2 потребителя- [Важно] Потребители не распределяются между очередями
Есть ли какая-либо конфигурация в RabbitMQ, которую мы можем указать, чтобы RabbitMQ отправлял только 2 сообщения из Q2 любому свободному потребителю и отправлял следующие 2 только после их подтверждения, даже если другие потребители свободны и готовы к использованию.
Основываясь на новом определении проблемы, у вас есть два варианта:
- Используйте
Basic.Get
— извлеките следующее сообщение из очереди, как только потребитель закончит обработку последнего сообщения. - Используйте предварительную выборку потребителя с ограничением 1. Это приведет к немедленной доставке первого и второго сообщений для каждого потребителя, а затем к доставке дополнительных сообщений по одному по мере подтверждения следующего сообщения для этого потребителя. Это немного сложнее, но может иметь смысл, если ваши пределы задержки составляют менее 10 миллисекунд.
Обратите внимание, что, правильно определив проблемное пространство, мы устранили фундаментальную проблему, заключающуюся в попытке выяснить, как гарантировать, что только два потребителя обрабатывают Q2
сообщения в любое время.
Ответ №3:
попробуйте новую функцию Single Active Consumer версии 3.8 .
Один активный потребитель позволяет одновременно использовать только одного потребителя из очереди и переключаться на другого зарегистрированного потребителя в случае отмены или смерти активного. Использование только одного потребителя полезно, когда сообщения должны быть получены и обработаны в том же порядке, в каком они поступают в очередь. При объявлении очереди может быть включен один активный потребитель, для аргумента x-single-active-consumer установлено значение true
https://www.rabbitmq.com/consumers.html#single-active-consumer
Комментарии:
1. Хотя ссылка может отвечать на вопрос, возможно, что ссылка будет удалена или информация переместится на другой URL в будущем. Следовательно, лучше всего предоставить часть этой информации в вашем ответе для доступа будущих читателей, даже если это произойдет.