Не удается заставить шаблон NetMQ pub-sub работать с ReceiveReady

#c# #zeromq #publish-subscribe #netmq

#c# #zeromq #опубликовать-подписаться #netmq

Вопрос:

Я пробую свои силы в NetMQ (3.3.3.4) и создаю шаблон pub-sub.

Я хочу, чтобы хост / сервер прослушивал все входящие данные на одном порту (9000) и пересылал данные всем клиентам / подписчикам на другом порту (9001).

Затем клиенты будут отправлять данные на 9000 и получать все сообщения, отправленные (кем бы то ни было) на 9001.

Следуя документации, я создал что-то вроде приведенного ниже кода, но я не могу заставить его работать. В основном, я полагаю, потому что ReceiveReady никогда не вызывается!

Как, по моему мнению, это должно работать:

  • client.Publish должно привести к разблокировке первой строки в host.SubscriberSocket_ReceiveReady и передаче данных в другой сокет
  • Когда данные были переданы, они должны появиться в бесконечном запуске Task в клиенте

Результаты:

  • Точки останова на // This line is never reached никогда не достигаются
  • Нигде нет исключений.
  • Переключение портов на хосте таким образом, чтобы publish = 9000 и subscribe = 9001 не влияли
  • Брандмауэр Windows отключен, поэтому не должно быть никакой блокировки
  • Не имеет значения, помещаю ли я адрес в PublisherSocket конструктор или использую _publisherSocket.Bind(address) в хосте или _publisherSocket.Connect(address) в клиенте

Что я делаю не так?

Хост

 public class MyNetMQHost {

    private NetMQSocket _publishSocket;
    private NetMQSocket _subscribeSocket;
    private NetMQPoller _poller;

    public MyNetMQHost(string publishAddress = "@tcp://localhost:9001", string subscribeAddress = "@tcp://localhost:9000") {
        Task.Factory.StartNew(() => {
            using (_publishSocket = new PublisherSocket(publishAddress))
            using (_subscribeSocket = new SubscriberSocket(subscribeAddress))
            using (_poller = new NetMQPoller { _publishSocket, _subscribeSocket }) {
                _subscriberSocket.ReceiveReady  = SubscriberSocket_ReceiveReady;
                _poller.Run();
            }
        });
    }

    private void SubscriberSocket_ReceiveReady(object sender, NetMQSocketEventArgs e) {
        var data = e.Socket.ReceiveMultipartBytes(); // This line is never reached
        _publishSocket.SendMultipartBytes(data);
    }
}
  

Клиент

 public class MyNetMQClient {

    private readonly NetMQSocket _publishSocket;
    private readonly NetMQSocket _subscribeSocket;

    public MyNetMQClient(string publishAddress = ">tcp://localhost:9000", string subscribeAddress = ">tcp://localhost:9001") {
        _publishSocket = new PublisherSocket(publishAddress);
        _subscribeSocket = new SubscriberSocket(subscribeAddress);

        Task.Factory.StartNew(() => {
            while (true) {
                byte[] frameBytes = _subscribeSocket.ReceiveFrameBytes();
                int one = 1; // This line is never reached
            }
        });
    }

    public void Publish(byte[] data) {
        _publishSocket.SendFrame(data);
    }
}
  

Тестировщик

 public class Tester {
    public void MyTester() {
        MyNetMQHost host = new MyNetMQHost();
        MyNetMQClient client = new MyNetMQClient();

        client.Publish(Encoding.Unicode.GetBytes("Hello world!");
    }
}
  

Ответ №1:

И ваш брокер, и клиент никогда не вызывают suscribe. На брокере вызовите suscriber.Subscribe(«»), чтобы подписаться на все. На вашем клиенте подписывайтесь на все, что хотите.

В вашем брокере вам действительно следует использовать XSubscriber и XPublisher для перемещения susvriptions. Таким образом, вам не нужно подписываться на все. Для этого вы можете использовать прокси-класс.

Комментарии:

1. После прочтения документации о XSub / XPub и некоторых проб и ошибок, я думаю, что у меня получилось! Спасибо 🙂