#spring #rabbitmq #spring-amqp #spring-rabbit
Вопрос:
Мы используем RabbitMQ с конфигурациями весенней загрузки по умолчанию. У нас есть вариант использования, в котором мы не хотим параллелизма для одного из слушателей. То есть мы хотим, чтобы в любой данный момент времени работал только один поток потребителя. Мы хотим этого, потому что природа варианта использования такова, что мы хотим, чтобы сообщения обрабатывались по порядку, поэтому, если на одного потребителя приходится несколько потоков, может быть вероятность того, что сообщения обрабатываются не по порядку. Поскольку мы используем значения по умолчанию и явно не изменили контейнер, мы используем SimpleMessageListenerContainer
. Просмотрев документацию, я попытался зафиксировать количество пользователей, которые ее используют concurrency = "1"
. Аннотация целевого метода выглядит следующим @RabbitListener(queues = ["queue-name"], concurrency = "1")
образом .
Согласно документации, это должно было гарантировать, что существует только поток потребителей.
{@ссылка org.springframework.amqp.кролик.слушатель.SimpleMessageListenerContainer * SimpleMessageListenerContainer} если это значение является простым целым числом, оно задает фиксированное * количество потребителей в свойстве {@code concurrentConsumers}
2021-10-29 06:11:26.361 INFO 29752 --- [ntContainer#4-1] c.t.t.i.p.s.xxx : Created xxx
2021-10-29 06:11:26.383 INFO 29752 --- [ntContainer#0-1] c.t.t.i.p.s.xxx : Created xxx
Идентификаторы потоков, которые следует отметить здесь, являются [ntContainer#4-1]
и [ntContainer#0-1]
.
Итак, вопрос в том, как мы можем гарантировать, что на каждого потребителя в любой данный момент времени приходится только один поток ?
Изменить: Добавление кода потребительского класса для большего контекста
@ConditionalOnProperty(value = ["rabbitmq.sharebooking.enabled"], havingValue = "true", matchIfMissing = false)
class ShareBookingConsumer @Autowired constructor(
private val shareBookingRepository: ShareBookingRepository,
private val objectMapper: ObjectMapper,
private val shareDtoToShareBookingConverter: ShareBookingDtoToShareBookingConverter
) {
private val logger = LoggerFactory.getLogger(javaClass)
init {
logger.info("start sharebooking created consumer")
}
@RabbitListener(queues = ["tax_engine.share_booking"], concurrency = "1-1", exclusive = true)
@Timed
@Transactional
fun consumeShareBookingCreatedEvent(message: Message) {
try {
consumeShareBookingCreatedEvent(message.body)
} catch (e: Exception) {
throw AmqpRejectAndDontRequeueException(e)
}
}
private fun consumeShareBookingCreatedEvent(event: ByteArray) {
toShareBookingCreationMessageEvent(event).let { creationEvent ->
RmqMetrics.measureEventMetrics(creationEvent)
val shareBooking = shareDtoToShareBookingConverter.convert(creationEvent.data)
val persisted = shareBookingRepository.save(shareBooking)
logger.info("Created shareBooking ${creationEvent.data.id}")
}
}
private fun toShareBookingCreationMessageEvent(event: ByteArray) =
objectMapper.readValue(event, shareBookingCreateEventType)
companion object {
private val shareBookingCreateEventType =
object : TypeReference<RMQMessageEnvelope<ShareBookingCreationDto>>() {}
}
}
Edit: Adding application thread analysis using visualvm
5 threads get created for 5 listeners.
[1]: https://i.stack.imgur.com/gQINE.png
Ответ №1:
Набор concurrency = "1-1"
. Обратите внимание , что параллелизм Прослушивателя зависит не только от concurrentConsumers
, но и от maxConcurrentConsumers
:
Если вы используете настраиваемую фабрику:
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(CachingConnectionFactory cachingConnectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(cachingConnectionFactory);
factory.setConcurrentConsumers(1);
factory.setMaxConcurrentConsumers(1);
return factory;
}
См.: https://docs.spring.io/spring-amqp/docs/current/reference/html/#simplemessagelistenercontainer для деталей.
РЕДАКТИРОВАТЬ: Я провел простой тест, 2 потребителя и 2 потока:
@RabbitListener(queues = "myQueue111", concurrency = "1-1")
public void handleMessage(Object message) throws InterruptedException {
LOGGER.info("Received message : {} in {}", message, Thread.currentThread().getName());
}
@RabbitListener(queues = "myQueue222", concurrency = "1-1")
public void handleMessag1e(Object message) throws InterruptedException {
LOGGER.info("Received message222 : {} in {}", message, Thread.currentThread().getName());
}
Комментарии:
1. это аннотация, которую я использую прямо сейчас,
@RabbitListener(queues = ["queue-name"], concurrency = "1-1", exclusive = true)
но я все еще вижу 2 потока в журналах приложений. Я вижу журнал запуска потребителя ровно один раз, и это здорово.2021-11-09 20:32:02.990 DEBUG 4418 --- [ntContainer#0-1] o.s.a.r.listener.BlockingQueueConsumer : Starting consumer Consumer@61ecf3d6: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0
но я все еще вижу [ntContainer#0-1] и [ntContainer#4-1] в журналах для одного и того же потребителя.2. У вас есть пять
RabbitListener
с разной очередью, что означает, что будет пять потоков: `ntContainer#0-1, ntContainer#1-1, ntContainer#2-1`,ntContainer#3-1, ntContainer#4-1` по одному для каждого слушателя. Но потребителем каждой очереди всегда должен быть один и тот же поток.3. проблема в том, что это одна и та же потребительская обработка в разных потоках. [ntContainer#0-1] и [ntContainer#4-1] создаются для одного и того же потребителя. Это я могу подтвердить, просмотрев журналы
2021-11-09 20:32:03.345 INFO 4418 --- [ntContainer#4-1] c.t.t.i.p.s.ShareBookingConsumer : Created shareBooking 904ad244-c57b-4e53-bb12-62a63271277d and 021-11-09 20:32:03.369 INFO 4418 --- [ntContainer#0-1] c.t.t.i.p.s.ShareBookingConsumer : Created shareBooking 43fc735b-c2d7-4531-9f70-77e24ddab6b8
4. Можете ли вы отредактировать свой вопрос, чтобы показать код для
ShareBookingConsumer
класса? @аашиш5. Будет ли только один экземпляр
ShareBookingConsumer
? Попробуйте добавитьLOGGER.info(String.valueOf(this.hashCode()));
проверку, если это один и тот же экземпляр? @аашиш
Ответ №2:
Попробуй это:
@RabbitListener(queues = ["queue-name"], exclusive = true)
Видишь https://docs.spring.io/spring-amqp/docs/current/reference/html/#exclusive-consumer
Комментарии:
1.
2021-11-09 18:56:01.898 INFO 884 --- [ntContainer#4-1] c.t.t.i.p.s. Created 2021-11-09 18:56:01.931 INFO 884 --- [ntContainer#0-1] c.t.t.i.p.s. : Created
Попробовал ваше предложение, но получил тот же результат, что и в журналах приложений. Это аннотация, которую я использовал@RabbitListener(queues = ["queue-name"], concurrency = "1", exclusive = true)
2. Сколько аннотаций RabbitListener есть в вашем приложении spring ?
3. у нас есть 5
@RabbitListener
аннотаций в приложении, но каждая привязана к другой очереди. Я попытался отладить больше после включения весеннего ведения журнала отладки amqp. этот журнал2021-11-09 20:32:02.990 DEBUG 4418 --- [ntContainer#0-1] o.s.a.r.listener.BlockingQueueConsumer : Starting consumer Consumer@61ecf3d6: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0
появляется только один раз, что гарантируетconcurrency
правильное соблюдение настроек. Если я установлюconcurrency = "10"
, я увижу 10 таких журналов.4. Я понимаю. Так почему же вы думаете, что в вашем Слушателе есть параллелизм? Если вы видите только [ntContainer#0-1], это означает, что у вас есть только один потребитель для этого слушателя. [ntContainer#4-1] является еще одним потребителем для другого слушателя.
5. это не может быть так, что журнал поступает от другого прослушивателя. Если я представлю вам полные журналы
2021-11-09 20:32:03.345 INFO 4418 --- [ntContainer#4-1] c.t.t.i.p.s.ShareBookingConsumer : Created shareBooking 904ad244-c57b-4e53-bb12-62a63271277d
021-11-09 20:32:03.369 INFO 4418 --- [ntContainer#0-1] c.t.t.i.p.s.ShareBookingConsumer : Created shareBooking 43fc735b-c2d7-4531-9f70-77e24ddab6b8
, и, таким образом, будет запущен один и тот же класс и метод. В классе есть только одна@RabbitListener
аннотацияShareBookingConsumer