#.net #rabbitmq #synchronous
#.net #rabbitmq #синхронный
Вопрос:
В .СЕТЕВАЯ версия (2.4.1) RabbitMQ RabbitMQ.Client.Шаблоны сообщений.Simplerpclient имеет метод Call() с этими сигнатурами:
public virtual object[] Call(params object[] args);
public virtual byte[] Call(byte[] body);
public virtual byte[] Call(IBasicProperties requestProperties, byte[] body, out IBasicProperties replyProperties);
Проблема:
При различных попытках метод по-прежнему продолжает не блокировать там, где я ожидаю, поэтому он никогда не сможет обработать ответ.
Вопрос:
Я упускаю что-то очевидное в настройке simplerpclient или ранее с IModel, IConnection или даже с адресом публикации?
Подробная информация:
Я также пробовал различные конфигурации параметров метода QueueDeclare(), но безуспешно.
string QueueDeclare(string queue, bool durable, bool exclusive, bool autoDelete, IDictionary arguments);
Еще несколько справочных кодов моей настройки этих:
IConnection conn = new ConnectionFactory{Address = "127.0.0.1"}.CreateConnection());
using (IModel ch = conn.CreateModel())
{
var client = new SimpleRpcClient(ch, queueName);
var queueName = ch.QueueDeclare("t.qid", true, true, true, null);
ch.QueueBind(queueName, "exch", "", null);
//HERE: does not block?
var replyMessageBytes = client.Call(prop, msgToSend, out replyProp);
}
Поиск в другом месте:
Или, скорее всего, проблема в моем «серверном» коде? С использованием basicAck() и без него кажется, что клиент уже продолжил выполнение.
Ответ №1:
—КОРОТКИЙ ОТВЕТ—
Часть «Вы делаете это неправильно«…
Проверьте IBasicProperties, и вы должны использовать SimpleRpcServer с HandleSimpleCall()
Если вы наткнетесь на этот вопрос, вы либо выбрали неправильный подход, как я, и, возможно, совершили аналогичную ошибку, неправильно манипулируя IBasicProperties, тем самым повредив способности SimpleRpcServer корректно функционировать.
—ДЛИННЫЙ ОТВЕТ—
Рабочий пример для .NET: Найдите мой рабочий пример на BitBucket здесь:
https://bitbucket.org/NickJosevski/synchronous-rabbitmq-sample-.net
Или вот краткий пример…
На стороне клиента:
IConnection conn = new ConnectionFactory{Address = "127.0.0.1"}.CreateConnection();
using (IModel ch = conn.CreateModel())
{
ch.ExchangeDeclare(Helper.ExchangeName, "direct");
var queueName = ch.EnsureQueue();
var client = new SimpleRpcClient(ch, queueName);
var msgToSend = new Message(/*data*/).Serialize();
IBasicProperties replyProp;
var reply = client.Call(new BasicProperties(), msgToSend, out replyProp);
}
На стороне сервера:
IConnection conn = new ConnectionFactory{Address = "127.0.0.1"}.CreateConnection();
using (IModel ch = conn.CreateModel())
{
ch.ExchangeDeclare(Helper.ExchangeName, "direct");
var queuename = ch.EnsureQueue();
var subscription = new Subscription(ch, queuename);
new MySimpleRpcServerSubclass(subscription).MainLoop();
}
internal class MySimpleRpcServerSubclass : SimpleRpcServer
{
public MySimpleRpcServerSubclass(Subscription subscription)
: base(subscription) { }
public override byte[] HandleSimpleCall(
bool isRedelivered, IBasicProperties requestProperties,
byte[] body, out IBasicProperties replyProperties)
{
replyProperties = requestProperties;
replyProperties.MessageId = Guid.NewGuid().ToString();
var m = Message.Deserialize(body);
var r =
new Response
{
Message = String.Format("Got {0} with {1}", m.Name, m.Body)
};
return r.Serialize();
}
}
Общий доступ:
//helper:
public static string EnsureQueue(this IModel ch)
{
var queueName = ch.QueueDeclare(QueueId, false, false, false, null);
ch.QueueBind(queueName, ExchangeName, "", null);
return queueName;
}
//NOTE:
not all extension methods are explained here, such as *.Serialize()*
as they're not relevant and just make for a cleaner example.
Комментарии:
1. Рассматривали ли вы возможность запуска
.MainLoop()
в отдельном потоке, используя что-то вродеSystem.Threading.Tasks.Task.Factory.StartNew(...MainLoop);
? Причина, по которой я спрашиваю, заключается в том, что у меня есть код, который должен прослушивать несколько отдельных очередей, и я думаю, это означает, что у меня должен быть simplerpccserver для каждой очереди, который запускает свой собственный MainLoop в отдельном потоке. Я не могу найти хороший пример этого в Интернете.