Как я могу получать составные сообщения с помощью ZeroMQ?

#c #zeromq

#c #zeromq

Вопрос:

Я не могу заставить оболочку ZeroMQ C получать составные сообщения. Тот же код, использующий версию C, работает просто отлично, но это приводит к исключению без каких-либо объяснений вообще с C . Код обработки, состоящий из нескольких частей, выглядит следующим образом:

 int _tmain(int argc, _TCHAR* argv[])
{
    zmq::context_t context(1);
    zmq::socket_t socket(context, ZMQ_REP);
    socket.bind("tcp://*:5555");

    while(true) {
        // the following two lines lead to exception
        zmq::message_t request;
        socket.recv(amp;request);

        //zmq_msg_t message;
        //zmq_msg_init (amp;message);
        //zmq_recv (socket, amp;message, 0);   
    }

    return 0;
}
  

Это чрезвычайно просто; эта версия не работает. но если я закомментирую первые две строки в цикле while и раскомментирую текущий комментируемый код (версия C), это сработает.
Это Windows XP sp3, Zeromq 2.1.1 и Visual Studio 2010 Express.

Если я отправляю сообщения из одной части, обе версии работают нормально. Что я делаю не так?

Комментарии:

1. Мне сказали обновить ZMQ до последней версии, за исключением того, что код считается исправным.

Ответ №1:

Я тоже новичок в ZMQ, и мне тоже пришлось немало потрудиться, чтобы разобраться в составных сообщениях с использованием REP / REQ в ZeroMQ. Мне пришлось просмотреть несколько ресурсов и объединить данные, чтобы понять это. Я думаю, что этот ответ поможет многим ищущим в ближайшем будущем, поэтому я делюсь клиентским и серверным кодом здесь. Я протестировал этот код, и он работает отлично. Однако, будучи новичком, есть вероятность, что я пропустил бы что-то важное. Пожалуйста, поделитесь своими ценными данными.

Серверный код

 void
serverMultipartREPREQ()
{
    try
    {
        zmq::context_t context(1);
        zmq::socket_t socket(context, ZMQ_REP);
        socket.bind("tcp://*:5556");
        std::cout << "Listening at port 5556..." << std::endl;

        zmq::message_t reply;

        socket.recv(reply, zmq::recv_flags::none);
        auto rep = std::string(static_cast<char*> (reply.data()), reply.size());

        std::cout << "Received: " << rep << std::endl;
        
        while(1)
        {    
            if (input == "exit")
                break;

            for (int j = 0; j < 3;   j)
            {
                std::string s("Message no - "   std::to_string(j));

                zmq::message_t message(s.length());
                memcpy(message.data(), s.c_str(), s.length());

                std::cout << "Sending: " << s << std::endl;

                if (j != 2)
                    socket.send(message, zmq::send_flags::sndmore);
                else
                    socket.send(message, zmq::send_flags::none); 
            }
        }
    }
    catch (const zmq::error_tamp; ze)
    {
        std::cout << "Exception: " << ze.what() << std::endl;
    }

    Sleep(5000);
}
  

Клиентский код

 void
clientMultipartREQREP()
{
    try
    {
        zmq::context_t context(1);

        std::cout << "Connecting to socket at 5556" << std::endl;
        zmq::socket_t socket(context, ZMQ_REQ);
        socket.connect("tcp://localhost:5556");
        std::cout << "Connected to socket at 5556" << std::endl;

        std::string msg("Hii this is client...");
        zmq::message_t message(msg.length());
        memcpy(message.data(), msg.c_str(), msg.length());

        socket.send(message, zmq::send_flags::none); // send to server (request message)

        while (true)
        {
            __int64 more = 1;

            if (more)
            {
                zmq::message_t message;
                socket.recv(message, zmq::recv_flags::none);
                auto rep = std::string(static_cast<char*> (message.data()), message.size());
                std::cout << "Reading from client: " << rep << std::endl;

                size_t size = sizeof(__int64);
                socket.getsockopt(ZMQ_RCVMORE, amp;more, amp;size); // if msg is not the last one then more = 1 else more = 0
            }
            else
            {
                std::cout << "Done..." << std::endl;
                break;
            }
        }
    }
    catch (const zmq::error_tamp; ze)
    {
        std::cout << "Exception: " << ze.what() << std::endl;
    }
    Sleep(5000);
}
  

Комментарии:

1. Я ценю приложенные усилия, но я немного сбит с толку. Обычно я ожидаю, что клиент будет отправлять несколько фреймов данных на сервер, но это только мой вариант использования. Этот пример имеет некоторый смысл, но раньше допускал только одно сообщение с несколькими фреймами Exception: Operation cannot be accomplished in current state Я бы предпочел, чтобы клиент получал набор многокомпонентных сообщений после каждого нажатия кнопки на сервере в качестве примера

Ответ №2:

Возможно, версия кода на C также не работает, но вы не проверяете код возврата zmq_recv, поэтому вы этого не замечаете. Кроме того, при получении многочастных сообщений вы должны проверить, есть ли еще части сообщения, которые должны быть получены через сокет, например, так:

 int64_t more = 0;
size_t more_size = sizeof(more);
socket.getsockopt(ZMQ_RCVMORE, amp;more, amp;more_size);
if (more != 0)
{
  //has more parts
}
  

Кроме того, взгляните на библиотеку ZmqMessage C , разработанную специально для отправки и получения составных сообщений ZeroMQ.

Ответ №3:

Я решил использовать C-версию кода. В общем, все примеры, похоже, в любом случае на C.