#c# #zeromq #publish-subscribe #netmq
#c# #zeromq #опубликовать-подписаться #netmq
Вопрос:
Я пробую свои силы в NetMQ (3.3.3.4) и создаю шаблон pub-sub.
Я хочу, чтобы хост / сервер прослушивал все входящие данные на одном порту (9000) и пересылал данные всем клиентам / подписчикам на другом порту (9001).
Затем клиенты будут отправлять данные на 9000 и получать все сообщения, отправленные (кем бы то ни было) на 9001.
Следуя документации, я создал что-то вроде приведенного ниже кода, но я не могу заставить его работать. В основном, я полагаю, потому что ReceiveReady
никогда не вызывается!
Как, по моему мнению, это должно работать:
client.Publish
должно привести к разблокировке первой строки вhost.SubscriberSocket_ReceiveReady
и передаче данных в другой сокет- Когда данные были переданы, они должны появиться в бесконечном запуске
Task
в клиенте
Результаты:
- Точки останова на
// This line is never reached
никогда не достигаются - Нигде нет исключений.
- Переключение портов на хосте таким образом, чтобы publish = 9000 и subscribe = 9001 не влияли
- Брандмауэр Windows отключен, поэтому не должно быть никакой блокировки
- Не имеет значения, помещаю ли я адрес в
PublisherSocket
конструктор или использую_publisherSocket.Bind(address)
в хосте или_publisherSocket.Connect(address)
в клиенте
Что я делаю не так?
Хост
public class MyNetMQHost {
private NetMQSocket _publishSocket;
private NetMQSocket _subscribeSocket;
private NetMQPoller _poller;
public MyNetMQHost(string publishAddress = "@tcp://localhost:9001", string subscribeAddress = "@tcp://localhost:9000") {
Task.Factory.StartNew(() => {
using (_publishSocket = new PublisherSocket(publishAddress))
using (_subscribeSocket = new SubscriberSocket(subscribeAddress))
using (_poller = new NetMQPoller { _publishSocket, _subscribeSocket }) {
_subscriberSocket.ReceiveReady = SubscriberSocket_ReceiveReady;
_poller.Run();
}
});
}
private void SubscriberSocket_ReceiveReady(object sender, NetMQSocketEventArgs e) {
var data = e.Socket.ReceiveMultipartBytes(); // This line is never reached
_publishSocket.SendMultipartBytes(data);
}
}
Клиент
public class MyNetMQClient {
private readonly NetMQSocket _publishSocket;
private readonly NetMQSocket _subscribeSocket;
public MyNetMQClient(string publishAddress = ">tcp://localhost:9000", string subscribeAddress = ">tcp://localhost:9001") {
_publishSocket = new PublisherSocket(publishAddress);
_subscribeSocket = new SubscriberSocket(subscribeAddress);
Task.Factory.StartNew(() => {
while (true) {
byte[] frameBytes = _subscribeSocket.ReceiveFrameBytes();
int one = 1; // This line is never reached
}
});
}
public void Publish(byte[] data) {
_publishSocket.SendFrame(data);
}
}
Тестировщик
public class Tester {
public void MyTester() {
MyNetMQHost host = new MyNetMQHost();
MyNetMQClient client = new MyNetMQClient();
client.Publish(Encoding.Unicode.GetBytes("Hello world!");
}
}
Ответ №1:
И ваш брокер, и клиент никогда не вызывают suscribe. На брокере вызовите suscriber.Subscribe(«»), чтобы подписаться на все. На вашем клиенте подписывайтесь на все, что хотите.
В вашем брокере вам действительно следует использовать XSubscriber и XPublisher для перемещения susvriptions. Таким образом, вам не нужно подписываться на все. Для этого вы можете использовать прокси-класс.
Комментарии:
1. После прочтения документации о XSub / XPub и некоторых проб и ошибок, я думаю, что у меня получилось! Спасибо 🙂