Клиент Spring AMQP зависает при достижении значения водяного знака

#rabbitmq #spring-amqp #spring-rabbit

#rabbitmq #spring-amqp #spring-rabbit

Вопрос:

Когда сервер RabbitMQ достигает уровня водяного знака, клиент, пытающийся отправить сообщение на сервер RabbitMQ, застревает в состоянии ожидания. Я хочу, чтобы клиент Spring AMQP отключился после ожидания в течение некоторого времени (500 мс). Пожалуйста, найдите ниже трассировку стека :-

  "pool-3-thread-1" #17 prio=5 os_prio=0 tid=0x000000002b7ea800 nid=0x5750 in Object.wait() [0x000000002d9fe000]
       java.lang.Thread.State: TIMED_WAITING (on object monitor)
        at java.lang.Object.wait(Native Method)
        - waiting on <0x000000071b4daeb8> (a com.rabbitmq.utility.BlockingValueOrException)
        at com.rabbitmq.utility.BlockingCell.get(BlockingCell.java:73)
        - locked <0x000000071b4daeb8> (a com.rabbitmq.utility.BlockingValueOrException)
        at com.rabbitmq.utility.BlockingCell.uninterruptibleGet(BlockingCell.java:120)
        - locked <0x000000071b4daeb8> (a com.rabbitmq.utility.BlockingValueOrException)
        at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36)
        at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:502)
        at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:293)
        at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:141)
        at com.rabbitmq.client.impl.ChannelN.txCommit(ChannelN.java:1540)
        at com.rabbitmq.client.impl.ChannelN.txCommit(ChannelN.java:52)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:1190)
        - locked <0x000000071b4b1380> (a java.lang.Object)
        at com.sun.proxy.$Proxy42.txCommit(Unknown Source)
        at org.springframework.amqp.rabbit.connection.RabbitUtils.commitIfNecessary(RabbitUtils.java:141)
        at org.springframework.amqp.rabbit.core.RabbitTemplate.doSend(RabbitTemplate.java:2322)
        at org.springframework.amqp.rabbit.core.RabbitTemplate.lambda$send$3(RabbitTemplate.java:1005)
        at org.springframework.amqp.rabbit.core.RabbitTemplate$$Lambda$122/1529091234.doInRabbit(Unknown Source)
        at org.springframework.amqp.rabbit.core.RabbitTemplate.invokeAction(RabbitTemplate.java:2147)
        at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:2106)
        at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:2058)
        at org.springframework.amqp.rabbit.core.RabbitTemplate.send(RabbitTemplate.java:1004)
        at org.springframework.amqp.rabbit.core.RabbitTemplate.convertAndSend(RabbitTemplate.java:1110)
        at org.springframework.amqp.rabbit.core.RabbitTemplate.convertAndSend(RabbitTemplate.java:1101)
  

Я создаю компонент ConnectionFactory, используя эту конфигурацию XML:-

 <bean id="connectionFactory"
        class="com.example.CustomCachingConnectionFactory">
        <constructor-arg value="#{messagingProperties['mq.hostname']}" />
        <property name="virtualHost" value="#{messagingProperties['mq.virtual-host']}" />
        <property name="username" value="#{messagingProperties['mq.username']}" />
        <property name="password" value="#{messagingProperties['mq.password']}" />
        <property name="channelCacheSize" value="25" />
    </bean>
  

Завод пользовательских подключений :-

 public class CustomCachingConnectionFactory extends CachingConnectionFactory {


    private com.rabbitmq.client.ConnectionFactory rabbitConnectionFactory;

    public CustomCachingConnectionFactory(@Nullable String hostname){
        super(hostname);
    }

    @Override
    public ConnectionFactory getRabbitConnectionFactory() {
        rabbitConnectionFactory = super.getRabbitConnectionFactory();
        rabbitConnectionFactory.useNio();
        return rabbitConnectionFactory;
    }

    @PostConstruct
    public void init() {
        rabbitConnectionFactory.setNioParams(getNioParams());
    }

    private NioParams getNioParams(){
        NioParams nioParams = new NioParams();
        nioParams.setWriteEnqueuingTimeoutInMs(500);
        return nioParams;
    }
}
  

Пожалуйста, предложите, как остановить клиент AMQP от бесконечного ожидания ответа сервера RabbitMQ.

версия amqp-клиента — 5.7.3

версия spring rabbit — 2.2.1.ВЫПУСТИТЬ версию spring amqp — 2.2.1.ВЫПУСТИТЬ ядро spring — 5.2.1.ВЫПУСТИТЬ

Полный дамп потока —https://gist.github.com/CODESTHN/7eedd6881b575977a0e7d69d7228325f

Комментарии:

1. Трассировка стека усекается — вам нужно показать ПОЛНУЮ трассировку стека, а также поток, который заблокирован <0x000000071b4daeb8> (Или опубликовать полный дамп потока где-нибудь, например, в pastebin или в GitHub Gist). Кроме того, какая версия spring-rabbit?

2. Я думаю, что ни один другой поток не заблокировал <0x000000071b4daeb8>, я прикрепил полный дамп потока, пожалуйста, посмотрите.

3. Похоже, что он ожидает ответа на txCommit() . Загляните в журналы сервера, чтобы увидеть, есть ли там какие-либо подсказки. Если нет, я предлагаю вам задать свой вопрос в группе rabbitmq-пользователи Google со ссылкой на эту суть; похоже, это не имеет никакого отношения к Spring; он просто вызывается txCommit() на канале.

4. из документации RabbitMQ : By default, when the RabbitMQ server uses above 40% of the available RAM, it raises a memory alarm and blocks all connections that are publishing messages. я думаю, вам нужно настроить сигнал тревоги, как описано на этой странице (я не понимаю, как вы можете установить время ожидания со стороны сервера)

5. @Jose M — Я намеренно запустил сигнализацию водяного знака для тестирования сценария, когда Rabbit не отвечает, потому что это иногда случается в PROD. Уменьшение времени ожидания канала до 1 минуты со значения по умолчанию в 10 минут помогло мне досрочно прекратить ожидание потоков.