#rabbitmq #amqp #spring-amqp
#rabbitmq #amqp #spring-amqp
Вопрос:
Я пытаюсь использовать обратный вызов publisher returns для объявления очереди и привязок, если маршрут недоступен, чтобы сообщения больше не отбрасывались. Это потому, что моя очередь автоматически удаляется и будет удалена, если мой потребитель выйдет из строя.
Но поток ReturnCallback застревает в returnedMessage() в admin.declareQueue(очередь).
При дальнейшей отладке я вижу, что он застрял в RabbitAdmin.declareQueue() по адресу: DeclareOk[] declared = declareQueues(канал, очередь);
Хотя этот вызов застрял, я вижу объявленную очередь (проверяется через консоль). Также последующие вызовы отправки не вызывают returnedMessage, поскольку, вероятно, первый вызов returnedMessage еще не вернулся.
Я делаю что-то не так здесь? Правильно ли объявлять очередь / привязки при обратном обратном вызове?
Любая помощь была бы очень признательна. Спасибо.
Ниже приведен мой обратный вызов:
public class MyReturnCallback implements ReturnCallback {
// constructor, member initialization goes here
@Override
public void returnedMessage(Message message, int replyCode,
String replyText, String exchangeName, String routingKey) {
if (replyCode == 312) {
if (this.exchangeName.equals(exchangeName) amp;amp; this.routingKey.equals(routingKey)) {
RabbitAdmin admin = new RabbitAdmin(connectionFactory);
Exchange exchange = new DirectExchange(exchangeName, true, false);
Queue queue = new Queue(queueName, true, false, true);
admin.declareQueue(queue);
Binding binding = BindingBuilder.bind(queue).to((DirectExchange)exchange).with(routingKey);
admin.declareBinding(binding);
if (null != binding) {
RabbitTemplate rabbitmqTemplate = new RabbitTemplate(connectionFactory);
logger.debug("Sending to [exchange:" exchange.getName() ", routing-key:" routingKey "]:" message.toString());
rabbitmqTemplate.send(exchangeName, routingKey, message);
}
}
}
}
}
И мой производитель тестов — это что-то вроде:
public class TestProducer {
// constructor, member initialization goes here
void initialize()
rabbitAdmin = new RabbitAdmin(connectionFactory);
rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setExchange(exchangeName);
rabbitTemplate.setMessageConverter(messageConverter);
rabbitTemplate.setRoutingKey(routingKey);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback(new RabbitReturnCallback());
Exchange exchange = new DirectExchange(exchangeName, true, false);
rabbitAdmin.declareExchange(exchange);
Queue queue = new Queue(queueName, true, false, true);
rabbitAdmin.declareQueue(queue);
Binding binding = BindingBuilder.bind(queue).to((DirectExchange)exchange).with(routingKey);
rabbitAdmin.declareBinding(binding);
}
void send() {
rabbitTemplate.convertAndSend(message);
}
}
Компонент ConnectionFactory является:
<rabbit:connection-factory id="rabbitmqConnectionFactory"
host="${rabbitmq.host:localhost}"
port="${rabbitmq.port:5672}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.vhost:/}"
cache-mode="CHANNEL"
channel-cache-size="${rabbitmq.channel-cache-size:25}"
publisher-returns="true"/>
Прикрепление журналов отладки для справки:
2016-10-04 17:58:58,881 [main] INFO CachingConnectionFactory:291 - Created new connection: SimpleConnection@4f2b503c [delegate=amqp://test@ip-address:5672/]
2016-10-04 17:58:58,883 [main] DEBUG RabbitAdmin:399 - Initializing declarations
2016-10-04 17:58:58,883 [main] DEBUG DefaultListableBeanFactory:250 - Returning cached instance of singleton bean 'org.springframework.context.annotation.ConfigurationClassPostProcessor.importRegistry'
2016-10-04 17:58:58,935 [main] DEBUG CachingConnectionFactory:453 - Creating cached Rabbit Channel from PublisherCallbackChannelImpl: AMQChannel(amqp://test@ip-address:5672/,1)
2016-10-04 17:58:58,975 [main] DEBUG PublisherCallbackChannelImpl:694 - Added listener org.springframework.amqp.rabbit.core.RabbitTemplate@67f639d3
2016-10-04 17:58:58,976 [main] DEBUG RabbitTemplate:1451 - Added pending confirms for Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://test@ip-address:5672/,1), conn: Proxy@bae7dc0 Shared Rabbit Connection: SimpleConnection@4f2b503c [delegate=amqp://test@ip-address:5672/] to map, size now 1
2016-10-04 17:58:58,976 [main] DEBUG RabbitTemplate:1296 - Executing callback on RabbitMQ Channel: Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://test@ip-address:5672/,1), conn: Proxy@bae7dc0 Shared Rabbit Connection: SimpleConnection@4f2b503c [delegate=amqp://test@ip-address:5672/]
2016-10-04 17:58:58,979 [main] TRACE CachingConnectionFactory:906 - Returning cached Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://test@ip-address:5672/,1)
2016-10-04 17:58:58,979 [main] DEBUG RabbitAdmin:460 - Declarations finished
2016-10-04 17:58:58,980 [main] TRACE CachingConnectionFactory:402 - Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://test@ip-address:5672/,1), conn: Proxy@bae7dc0 Shared Rabbit Connection: SimpleConnection@4f2b503c [delegate=amqp://test@ip-address:5672/] retrieved from cache
2016-10-04 17:58:58,981 [main] TRACE CachingConnectionFactory:439 - Found cached Rabbit Channel: Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://test@ip-address:5672/,1), conn: Proxy@bae7dc0 Shared Rabbit Connection: SimpleConnection@4f2b503c [delegate=amqp://test@ip-address:5672/]
2016-10-04 17:58:58,981 [main] DEBUG PublisherCallbackChannelImpl:694 - Added listener org.springframework.amqp.rabbit.core.RabbitTemplate@82de64a
2016-10-04 17:58:58,982 [main] DEBUG RabbitTemplate:1451 - Added pending confirms for Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://test@ip-address:5672/,1), conn: Proxy@bae7dc0 Shared Rabbit Connection: SimpleConnection@4f2b503c [delegate=amqp://test@ip-address:5672/] to map, size now 1
2016-10-04 17:58:58,982 [main] DEBUG RabbitTemplate:1296 - Executing callback on RabbitMQ Channel: Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://test@ip-address:5672/,1), conn: Proxy@bae7dc0 Shared Rabbit Connection: SimpleConnection@4f2b503c [delegate=amqp://test@ip-address:5672/]
2016-10-04 17:58:58,982 [main] DEBUG RabbitAdmin:487 - declaring Exchange 'myexchange'
2016-10-04 17:58:59,006 [main] TRACE CachingConnectionFactory:906 - Returning cached Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://test@ip-address:5672/,1)
2016-10-04 17:58:59,007 [main] INFO TestProducer:59 - Declared/Declare-confirmed for direct exchange: myexchange
2016-10-04 17:58:59,009 [main] DEBUG TestProducer:74 - Sending to [exchange:myexchange, routing-key:mykey]:[testpayload]
2016-10-04 17:58:59,090 [main] TRACE CachingConnectionFactory:402 - Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://test@ip-address:5672/,1), conn: Proxy@bae7dc0 Shared Rabbit Connection: SimpleConnection@4f2b503c [delegate=amqp://test@ip-address:5672/] retrieved from cache
2016-10-04 17:58:59,091 [main] TRACE CachingConnectionFactory:439 - Found cached Rabbit Channel: Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://test@ip-address:5672/,1), conn: Proxy@bae7dc0 Shared Rabbit Connection: SimpleConnection@4f2b503c [delegate=amqp://test@ip-address:5672/]
2016-10-04 17:58:59,091 [main] DEBUG PublisherCallbackChannelImpl:694 - Added listener org.springframework.amqp.rabbit.core.RabbitTemplate@793f29ff
2016-10-04 17:58:59,091 [main] DEBUG RabbitTemplate:1451 - Added pending confirms for Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://test@ip-address:5672/,1), conn: Proxy@bae7dc0 Shared Rabbit Connection: SimpleConnection@4f2b503c [delegate=amqp://test@ip-address:5672/] to map, size now 1
2016-10-04 17:58:59,091 [main] DEBUG RabbitTemplate:1296 - Executing callback on RabbitMQ Channel: Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://test@ip-address:5672/,1), conn: Proxy@bae7dc0 Shared Rabbit Connection: SimpleConnection@4f2b503c [delegate=amqp://test@ip-address:5672/]
2016-10-04 17:58:59,098 [main] DEBUG RabbitTemplate:1325 - Publishing message on exchange [myexchange], routingKey = [mykey]
2016-10-04 17:58:59,104 [main] TRACE CachingConnectionFactory:906 - Returning cached Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://test@ip-address:5672/,1)
2016-10-04 17:58:59,134 [AMQP Connection ip-address:5672] DEBUG TestProducer:103 - returnedMessage, replyCode: 312, replyText: NO_ROUTE
2016-10-04 17:58:59,135 [AMQP Connection ip-address:5672] TRACE CachingConnectionFactory:402 - Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://test@ip-address:5672/,1), conn: Proxy@bae7dc0 Shared Rabbit Connection: SimpleConnection@4f2b503c [delegate=amqp://test@ip-address:5672/] retrieved from cache
2016-10-04 17:58:59,136 [AMQP Connection ip-address:5672] TRACE CachingConnectionFactory:439 - Found cached Rabbit Channel: Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://test@ip-address:5672/,1), conn: Proxy@bae7dc0 Shared Rabbit Connection: SimpleConnection@4f2b503c [delegate=amqp://test@ip-address:5672/]
2016-10-04 17:58:59,163 [AMQP Connection ip-address:5672] DEBUG PublisherCallbackChannelImpl:694 - Added listener org.springframework.amqp.rabbit.core.RabbitTemplate@7ec6c641
2016-10-04 17:58:59,163 [AMQP Connection ip-address:5672] DEBUG RabbitTemplate:1451 - Added pending confirms for Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://test@ip-address:5672/,1), conn: Proxy@bae7dc0 Shared Rabbit Connection: SimpleConnection@4f2b503c [delegate=amqp://test@ip-address:5672/] to map, size now 1
2016-10-04 17:58:59,163 [AMQP Connection ip-address:5672] DEBUG RabbitTemplate:1296 - Executing callback on RabbitMQ Channel: Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://test@ip-address:5672/,1), conn: Proxy@bae7dc0 Shared Rabbit Connection: SimpleConnection@4f2b503c [delegate=amqp://test@ip-address:5672/]
2016-10-04 17:58:59,164 [AMQP Connection ip-address:5672] DEBUG RabbitAdmin:515 - declaring Queue 'myqueue'
2016-10-04 17:59:29,104 [main] DEBUG TestProducer:74 - Sending to [exchange:myexchange, routing-key:mykey]:[testpayload]
Nothing happens after this.
Комментарии:
1. Интересно; Я воспроизвел это — расследование…
Ответ №1:
Интересная проблема — поскольку канал помещается обратно в кэш после первоначальной отправки, когда возврат доставляется по этому каналу, отправка в обратном вызове получает тот же канал, и это вызывает взаимоблокировку в канале — выполнение объявления, пока мы все еще обрабатываем возврат.
Мы не можем откладывать возврат канала обратно в кэш, пока не получим возврат, потому что мы не знаем, получим ли мы когда-нибудь возврат.
Я немного подумаю о том, есть ли какой-либо способ обнаружить это условие и избежать его, но в то же время, самое безопасное, что вы можете сделать, это обработать объявление и повторно опубликовать в другом потоке…
private final Executor executor = Executors.newCachedThreadPool();
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchangeName,
String routingKey) {
if (replyCode == 312) {
executor.execute(() -> {
RabbitAdmin admin = new RabbitAdmin(connectionFactory);
Exchange exchange = new DirectExchange(exchangeName, true, false);
Queue queue = new Queue("foo", true, false, true);
admin.declareQueue(queue);
Binding binding = BindingBuilder.bind(queue).to((DirectExchange) exchange).with(routingKey);
admin.declareBinding(binding);
if (null != binding) {
RabbitTemplate rabbitmqTemplate = new RabbitTemplate(connectionFactory);
System.out.println("Sending to [exchange:" exchange.getName() ", routing-key:" routingKey
"]:" message.toString());
rabbitmqTemplate.send(exchangeName, routingKey, message);
}
});
}
}
Комментарии:
1. Большое спасибо @GaryRussell за столь быстрый ответ. Я попробую предложенный вами подход. Спасибо.
2. Попробовал предложенный вами подход. Решил мою проблему. Спасибо!