#rabbitmq #spring-amqp #spring-rabbit
#rabbitmq #spring-amqp #spring-rabbit
Вопрос:
Когда сервер RabbitMQ достигает уровня водяного знака, клиент, пытающийся отправить сообщение на сервер RabbitMQ, застревает в состоянии ожидания. Я хочу, чтобы клиент Spring AMQP отключился после ожидания в течение некоторого времени (500 мс). Пожалуйста, найдите ниже трассировку стека :-
"pool-3-thread-1" #17 prio=5 os_prio=0 tid=0x000000002b7ea800 nid=0x5750 in Object.wait() [0x000000002d9fe000]
java.lang.Thread.State: TIMED_WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x000000071b4daeb8> (a com.rabbitmq.utility.BlockingValueOrException)
at com.rabbitmq.utility.BlockingCell.get(BlockingCell.java:73)
- locked <0x000000071b4daeb8> (a com.rabbitmq.utility.BlockingValueOrException)
at com.rabbitmq.utility.BlockingCell.uninterruptibleGet(BlockingCell.java:120)
- locked <0x000000071b4daeb8> (a com.rabbitmq.utility.BlockingValueOrException)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:502)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:293)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:141)
at com.rabbitmq.client.impl.ChannelN.txCommit(ChannelN.java:1540)
at com.rabbitmq.client.impl.ChannelN.txCommit(ChannelN.java:52)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:1190)
- locked <0x000000071b4b1380> (a java.lang.Object)
at com.sun.proxy.$Proxy42.txCommit(Unknown Source)
at org.springframework.amqp.rabbit.connection.RabbitUtils.commitIfNecessary(RabbitUtils.java:141)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doSend(RabbitTemplate.java:2322)
at org.springframework.amqp.rabbit.core.RabbitTemplate.lambda$send$3(RabbitTemplate.java:1005)
at org.springframework.amqp.rabbit.core.RabbitTemplate$$Lambda$122/1529091234.doInRabbit(Unknown Source)
at org.springframework.amqp.rabbit.core.RabbitTemplate.invokeAction(RabbitTemplate.java:2147)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:2106)
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:2058)
at org.springframework.amqp.rabbit.core.RabbitTemplate.send(RabbitTemplate.java:1004)
at org.springframework.amqp.rabbit.core.RabbitTemplate.convertAndSend(RabbitTemplate.java:1110)
at org.springframework.amqp.rabbit.core.RabbitTemplate.convertAndSend(RabbitTemplate.java:1101)
Я создаю компонент ConnectionFactory, используя эту конфигурацию XML:-
<bean id="connectionFactory"
class="com.example.CustomCachingConnectionFactory">
<constructor-arg value="#{messagingProperties['mq.hostname']}" />
<property name="virtualHost" value="#{messagingProperties['mq.virtual-host']}" />
<property name="username" value="#{messagingProperties['mq.username']}" />
<property name="password" value="#{messagingProperties['mq.password']}" />
<property name="channelCacheSize" value="25" />
</bean>
Завод пользовательских подключений :-
public class CustomCachingConnectionFactory extends CachingConnectionFactory {
private com.rabbitmq.client.ConnectionFactory rabbitConnectionFactory;
public CustomCachingConnectionFactory(@Nullable String hostname){
super(hostname);
}
@Override
public ConnectionFactory getRabbitConnectionFactory() {
rabbitConnectionFactory = super.getRabbitConnectionFactory();
rabbitConnectionFactory.useNio();
return rabbitConnectionFactory;
}
@PostConstruct
public void init() {
rabbitConnectionFactory.setNioParams(getNioParams());
}
private NioParams getNioParams(){
NioParams nioParams = new NioParams();
nioParams.setWriteEnqueuingTimeoutInMs(500);
return nioParams;
}
}
Пожалуйста, предложите, как остановить клиент AMQP от бесконечного ожидания ответа сервера RabbitMQ.
версия amqp-клиента — 5.7.3
версия spring rabbit — 2.2.1.ВЫПУСТИТЬ версию spring amqp — 2.2.1.ВЫПУСТИТЬ ядро spring — 5.2.1.ВЫПУСТИТЬ
Полный дамп потока —https://gist.github.com/CODESTHN/7eedd6881b575977a0e7d69d7228325f
Комментарии:
1. Трассировка стека усекается — вам нужно показать ПОЛНУЮ трассировку стека, а также поток, который заблокирован
<0x000000071b4daeb8>
(Или опубликовать полный дамп потока где-нибудь, например, в pastebin или в GitHub Gist). Кроме того, какая версия spring-rabbit?2. Я думаю, что ни один другой поток не заблокировал <0x000000071b4daeb8>, я прикрепил полный дамп потока, пожалуйста, посмотрите.
3. Похоже, что он ожидает ответа на
txCommit()
. Загляните в журналы сервера, чтобы увидеть, есть ли там какие-либо подсказки. Если нет, я предлагаю вам задать свой вопрос в группе rabbitmq-пользователи Google со ссылкой на эту суть; похоже, это не имеет никакого отношения к Spring; он просто вызываетсяtxCommit()
на канале.4. из документации RabbitMQ :
By default, when the RabbitMQ server uses above 40% of the available RAM, it raises a memory alarm and blocks all connections that are publishing messages.
я думаю, вам нужно настроить сигнал тревоги, как описано на этой странице (я не понимаю, как вы можете установить время ожидания со стороны сервера)5. @Jose M — Я намеренно запустил сигнализацию водяного знака для тестирования сценария, когда Rabbit не отвечает, потому что это иногда случается в PROD. Уменьшение времени ожидания канала до 1 минуты со значения по умолчанию в 10 минут помогло мне досрочно прекратить ожидание потоков.