#rabbitmq #spring-amqp #spring-cloud-connectors
#rabbitmq #spring-amqp #spring-cloud-connectors
Вопрос:
У нашей команды возникли некоторые проблемы с Spring AMQP при использовании cloud foundry connectors. Всякий раз, когда в сети возникает некоторая нестабильность, соединение прерывается, тогда AMQP, похоже, пытается автоматически восстановиться, не удается автоматически восстановить, затем создает нового потребителя.
Проблема в том, что RabbitMQ, похоже, не отменяет регистрацию старых потребителей… таким образом, в итоге у нас получается 40 потребителей вместо обычного одного потребителя. И я думаю, что из-за большого количества недействительных потребителей старые потребители получают некоторые сообщения, но не будут запускать наш код. Итак, у нас есть куча нераспакованных сообщений, пока приложение не будет перезапущено и не будет очищен список потребителей.
Я не уверен, является ли это неправильной настройкой с нашей стороны или ошибкой. Есть идеи?
Дополнительная информация ниже.
Часть наших журналов:
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842 com.rabbitmq.client.TopologyRecoveryException: Caught an exception while recovering consumer amq.ctag-Dqik8vz9dSPiUzMbM4uGbw: connection is already closed due to connection error; cause: java.net.SocketException: Connection reset
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842 at java.lang.Thread.run(Thread.java:748) [na:1.8.0_162]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842 2019-03-04 00:31:47.823 ERROR [Atlas Backend,,,] 23 --- [0.32.27.77:5672] c.r.c.impl.ForgivingExceptionHandler : Caught an exception when recovering topology Caught an exception while recovering consumer amq.ctag-t7_vY47weZNYtWagl__pYA: connection is already closed due to connection error; cause: java.net.SocketException: Connection reset
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842 at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:198) ~[amqp-client-4.0.3.jar!/:4.0.3]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842 2019-03-04 00:31:47.823 ERROR [Atlas Backend,,,] 23 --- [0.32.27.77:5672] c.r.c.impl.ForgivingExceptionHandler : Caught an exception when recovering topology Caught an exception while recovering consumer amq.ctag-AK1YOpN0_E2KVwSi4fzOaw: connection is already closed due to connection error; cause: java.net.SocketException: Connection reset
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842 at com.rabbitmq.client.impl.AMQConnection.notifyRecoveryCanBeginListeners(AMQConnection.java:693) [amqp-client-4.0.3.jar!/:4.0.3]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842 at com.rabbitmq.client.impl.AMQConnection.doFinalShutdown(AMQConnection.java:687) [amqp-client-4.0.3.jar!/:4.0.3]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842 at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:577) [amqp-client-4.0.3.jar!/:4.0.3]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842 at java.lang.Thread.run(Thread.java:748) [na:1.8.0_162]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842 2019-03-04 00:31:47.822 ERROR [Atlas Backend,,,] 23 --- [0.32.27.77:5672] c.r.c.impl.ForgivingExceptionHandler : Caught an exception when recovering topology Caught an exception while recovering consumer amq.ctag-vrWhRdhk3ejrXo2-ImjXzA: connection is already closed due to connection error; cause: java.net.SocketException: Connection reset
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842 at com.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel.java:244) ~[amqp-client-4.0.3.jar!/:4.0.3]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842 at java.lang.Thread.run(Thread.java:748) [na:1.8.0_162]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842 at java.lang.Thread.run(Thread.java:748) [na:1.8.0_162]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842 at com.rabbitmq.client.impl.recovery.RecordedConsumer.recover(RecordedConsumer.java:60) ~[amqp-client-4.0.3.jar!/:4.0.3]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842 ... 7 common frames omitted
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842 at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.recoverConsumers(AutorecoveringConnection.java:657) [amqp-client-4.0.3.jar!/:4.0.3]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842 at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.access$000(AutorecoveringConnection.java:53) [amqp-client-4.0.3.jar!/:4.0.3]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842 at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.recoverConsumers(AutorecoveringConnection.java:657) [amqp-client-4.0.3.jar!/:4.0.3]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842 at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:577) [amqp-client-4.0.3.jar!/:4.0.3]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842 at com.rabbitmq.client.impl.ChannelN.basicConsume(ChannelN.java:1242) ~[amqp-client-4.0.3.jar!/:4.0.3]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842 at com.rabbitmq.client.impl.ChannelN.basicConsume(ChannelN.java:1242) ~[amqp-client-4.0.3.jar!/:4.0.3]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842 at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.recoverConsumers(AutorecoveringConnection.java:657) [amqp-client-4.0.3.jar!/:4.0.3]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842 at com.rabbitmq.client.impl.AMQConnection.doFinalShutdown(AMQConnection.java:687) [amqp-client-4.0.3.jar!/:4.0.3]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842 at com.rabbitmq.client.impl.AMQConnection.notifyRecoveryCanBeginListeners(AMQConnection.java:693) [amqp-client-4.0.3.jar!/:4.0.3]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842 at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:198) ~[amqp-client-4.0.3.jar!/:4.0.3]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842 at com.rabbitmq.client.impl.recovery.RecordedConsumer.recover(RecordedConsumer.java:60) ~[amqp-client-4.0.3.jar!/:4.0.3]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842 at com.rabbitmq.client.impl.recovery.RecordedConsumer.recover(RecordedConsumer.java:60) ~[amqp-client-4.0.3.jar!/:4.0.3]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842 at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.recoverConsumers(AutorecoveringConnection.java:673) [amqp-client-4.0.3.jar!/:4.0.3]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842 at com.rabbitmq.client.impl.ChannelN.basicConsume(ChannelN.java:1242) ~[amqp-client-4.0.3.jar!/:4.0.3]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842 at java.lang.Thread.run(Thread.java:748) [na:1.8.0_162]
Мы регистрируем наших потребителей, используя аннотацию @RabbitListener
@RabbitListener(id = CONSUMER_ID, queues = "${CommonInternalEndPoint}", containerFactory = "rabbitLocalContainerFactory")
protected void process(Object messageObject) {
Поскольку мы подключаемся к нескольким экземплярам rabbit mq, нам нужно вручную объявить фабрику соединений. Наша фабрика соединений основана на соединителях cloud foundry:
@Configuration
@Profile("cloud")
public class RabbitCloudConfiguration {
@Bean
public Cloud cloud(){
return new CloudFactory().getCloud();
}
@Bean
@Primary
public ConnectionFactory connectionFactory(){
return cloud().getSingletonServiceConnector(ConnectionFactory.class, null);
}
@Bean("rabbitLocalContainerFactory")
public SimpleRabbitListenerContainerFactory rabbitLocalContainerFactory(){
SimpleRabbitListenerContainerFactory containerFactory = new SimpleRabbitListenerContainerFactory();
containerFactory.setConnectionFactory(connectionFactory());
containerFactory.setAutoStartup(true);
return containerFactory;
}
}
Комментарии:
1. Какую версию Spring AMQP вы используете?
2. Мы используем Spring Boot 1.5.9, фреймворк 4.3.6, AMQP 1.7.4, клиент RabbitMQ AMQP 4.0.3
Ответ №1:
Мы используем Spring Boot 1.5.9, фреймворк 4.3.6, AMQP 1.7.4, клиент RabbitMQ AMQP 4.0.3
Начиная с версии 2.0, Spring AMQP отключает automaticRecoveryEnabled
для целевого RabbitMQ ConnectionFactory
: https://docs.spring.io/spring-amqp/docs/2.1.4.RELEASE/reference/#auto-recovery
Поскольку используемая вами версия основана на RabbitMQ ConnectionFactory
, предоставляемом Cloud Connectors, вам необходимо явно отключить ее:
@Bean
@Primary
public ConnectionFactory connectionFactory(){
ConnectionFactory connectionFactory = cloud().getSingletonServiceConnector(ConnectionFactory.class, null);
((CachingConnectionFactory) connectionFactory).getRabbitConnectionFactory().setAutomaticRecoveryEnabled(false);
return connectionFactory;
}
И ваше решение будет основано на механизме восстановления, предоставляемом Spring AMQP.
Комментарии:
1. Я не был уверен, стоит ли мне это делать, потому что, согласно документации, оба восстановления должны «нормально воспроизводиться». Но я вижу, что это не тот случай, если они отключены в последних версиях.
2. Как я уже сказал: это отключено по умолчанию, начиная с Spring AMQP 2.0