amqp-c не отправляет в очередь, когда инструкции разделены между двумя функциями, но работает, если набор команд находится в одной функции. Почему?

c #rabbitmq #amqp #rabbitmq-c

#c #rabbitmq #amqp #rabbitmq-c

Вопрос:

ВАЖНОЕ ЗАМЕЧАНИЕ:

Этот вопрос был отредактирован из-за комментария Икегами. Я надеюсь, что фактический вопрос теперь более ясен. Пожалуйста, обратите внимание, что на момент написания статьи было без десяти час ночи, и добавленный код не проходил через IDE или какой-либо редактор кода, поэтому возможны опечатки, ошибки отступов и грубые ошибки. Пожалуйста, проявите понимание, когда дело дойдет до этого, спасибо. Вернемся к первоначальному вопросу.

Я оборачиваю свой мозг вокруг AMQP, и после экспериментов с несколькими библиотеками rabbitmq-c приблизил меня к успеху. Я все еще работаю над примерами, но даже они доставляют мне огорчение, некоторый код для контекста, сначала рабочая версия:

main.cpp:

 #include "handler.h"
#include <QTimer>
int main(int argc, char *argv[])
{
    QCoreApplication a(argc, argv);
    Handler* myHandler = new Handler();
    return a.exec();
}
 

Обработчик.h

 #define HANDLER_H

#include <QObject>
#include "rabbitmq-c/amqp.h"
#include "rabbitmq-c/tcp_socket.h"

class Handler : public QObject
{
    Q_OBJECT

private:
    amqp_socket_t* socket;
    amqp_connection_state_t connectionState;
    char const *hostname;
    int port, status;
    char const *exchange;
    char const *routingkey;
    char const *messagebody;

public:
    explicit Handler(QObject *parent = nullptr);
    void sendMessage(QString queue_name, QString message);

signals:

};

#endif // HANDLER_H
 

и Handler.cpp

 #include "rabbitmq-c/amqp.h"
#include "rabbitmq-c/tcp_socket.h"
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <iostream>
#include<QDebug>


Handler::Handler(QObject *parent) : QObject(parent)
{

      hostname = "ip.to.rabbitmq.server";
      port = 5672;
      exchange = "";
      routingkey = "someRoutingKey";
      messagebody = "HOOH";

      connectionState = amqp_new_connection();

      socket = amqp_tcp_socket_new(connectionState);
      if (!socket) {
        std::cout << "die: socket is false";
      }

      status = amqp_socket_open(socket, hostname, port);
      if (status) {
        std::cout << "die: opening TCP socket" << std::endl;
      }

      amqp_login(connectionState, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN,
                                   "guest", "guest");
      amqp_channel_open(connectionState, 1);
      amqp_get_rpc_reply(connectionState);

      {
        amqp_basic_properties_t props;
        props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
        props.content_type = amqp_cstring_bytes("text/plain");
        props.delivery_mode = 2; /* persistent delivery mode */
        amqp_basic_publish(connectionState, 1, amqp_cstring_bytes(exchange),
                                        amqp_cstring_bytes(routingkey), 0, 0,
                                        amp;props, amqp_cstring_bytes(messagebody));
      }

      amqp_channel_close(connectionState, 1, AMQP_REPLY_SUCCESS);
      amqp_connection_close(connectionState, AMQP_REPLY_SUCCESS);
      amqp_destroy_connection(connectionState);
}

void Handler::sendMessage(QString queue_name, QString message)
{
    
}
 

What I actually want to do, and what I don’t manage to do would look something like:

main.cpp

 #include <QCoreApplication>
#include "handler.h"
#include <QTimer>
int main(int argc, char *argv[])
{
    QCoreApplication a(argc, argv);
    Handler* myHandler = new Handler();
    QTimer::singleShot(5000, [myHandler]()->void
    {
        myHandler->sendMessage("someRoutingKey", "HOOH");
        QTimer::singleShot(5000, [myHandler]()->void
        {
             myHandler->close();
             QCoreApplication::quit();
        }
    });
    return a.exec();
}
 

Details of main are not imprtant here and the file is here just to display the manner in which I want to use my Handler class:

  1. create the Handler class,
  2. delay for a time to make sure we don’t trip into delays of communicating across network, then send a message to a queue. Finally
  3. delay for a time, then close the connection and shut down the program

Handler.h

 #define HANDLER_H

#include <QObject>
#include "rabbitmq-c/amqp.h"
#include "rabbitmq-c/tcp_socket.h"

class Handler : public QObject
{
    Q_OBJECT

private:
    amqp_socket_t* socket;
    amqp_connection_state_t connectionState;
    char const *hostname;
    int port, status;
    char const *exchange;
    char const *routingkey;
    char const *messagebody;

public:
    explicit Handler(QObject *parent = nullptr);
    void sendMessage(QString queue_name, QString message);
    void close();

signals:

};

#endif // HANDLER_H
 

и handler.cpp

 #include "handler.h"
#include "rabbitmq-c/amqp.h"
#include "rabbitmq-c/tcp_socket.h"
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <iostream>
#include<QDebug>


Handler::Handler(QObject *parent) : QObject(parent)
{

      hostname = "ip.to.rabbitmq.server";
      port = 5672;
      exchange = "";
      routingkey = "someRoutingKey";
      messagebody = "HOOH";

      connectionState = amqp_new_connection();

      socket = amqp_tcp_socket_new(connectionState);
      if (!socket) {
        std::cout << "die: socket is false";
      }

      status = amqp_socket_open(socket, hostname, port);
      if (status) {
        std::cout << "die: opening TCP socket" << std::endl;
      }

      amqp_login(connectionState, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN,
                                   "guest", "guest");
      

      amqp_channel_close(connectionState, 1, AMQP_REPLY_SUCCESS);
      amqp_connection_close(connectionState, AMQP_REPLY_SUCCESS);
      amqp_destroy_connection(connectionState);
}

void Handler::sendMessage(QString queue_name, QString message)
{
    amqp_channel_open(connectionState, 1);
      amqp_get_rpc_reply(connectionState);

      {
        amqp_basic_properties_t props;
        props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
        props.content_type = amqp_cstring_bytes("text/plain");
        props.delivery_mode = 2; /* persistent delivery mode */
        amqp_basic_publish(connectionState, 1, amqp_cstring_bytes(exchange),
                                        amqp_cstring_bytes(routingkey), 0, 0,
                                        amp;props, amqp_cstring_bytes(messagebody));
      }
}

void Handler::close()
{
      amqp_channel_close(connectionState, 1, AMQP_REPLY_SUCCESS);
      amqp_connection_close(connectionState, AMQP_REPLY_SUCCESS);
      amqp_destroy_connection(connectionState);
}


 

Вы можете видеть различия между текущей рабочей реализацией и реализацией, которую я хотел бы иметь. Насколько я понимаю, разделение инструкций с одной функции на несколько не должно влиять на функциональность, но, похоже, это так. Я пробовал с QTimers и без них, но безрезультатно.

В чем проблема? Я бы хотел разделить формирование соединения, отправку сообщения и закрытие соединения на их собственные функции, но что бы я ни пытался, это не работает. Пример библиотеки, с которой я работаю, можно найти здесь .

Как уже говорилось ранее, я пробовал разные библиотеки: обе версии QAMQP, найденные здесь и здесь, а также сырую реализацию C , найденную здесь . Из всех трех rabbitmq-c приблизил меня к прорыву.

Любая помощь будет принята с благодарностью, спасибо.

Редактировать

Я переделал примеры кода, чтобы сделать мою точку зрения более понятной

Комментарии:

1. @ikegami Теперь я отредактировал вопрос. Надеюсь, мое намерение теперь яснее.