Инициировать обратный вызов для нового сообщения с помощью MQTT paho async publish

#c #eclipse #callback #mqtt #paho

#c #затмение #обратный вызов #мкттт #paho

Вопрос:

Я использую пример кода MQTTAsync_publish.c из MQTT paho. Что я сделал, так это заменил постоянную ПОЛЕЗНУЮ НАГРУЗКУ в примере кода на SendMessageArray. Содержимое этого массива изменяется в моем другом коде, если поступает новое радиосообщение. Полезная нагрузка успешно отправляется при первом подключении.

Однако я не понимаю, как я запускаю новую публикацию при изменении моей полезной нагрузки?

Функция MQTTAsync_sendMessage запускается в обратном вызове OnConnect, поэтому мне просто как-то нужно снова запустить этот обратный вызов, но как. Если я изменю пример кода, чтобы оставаться на связи после первой отправки, не вызывая обратный вызов onSend, то с finished = 0 это «Ожидает публикации %s в теме %s для клиента с ClientID: % s n», но я не могу разместить новую полезную нагрузку.

 volatile MQTTAsync_token deliveredtoken;
char SendMessageArray[4096] = { 0 };  // global
int finished = 0;

void connlost(void *context, char *cause)
{
    MQTTAsync client = (MQTTAsync)context;
    MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
    int rc;

    printf("nConnection lostn");
    printf("     cause: %sn", cause);

    printf("Reconnectingn");
    conn_opts.keepAliveInterval = 20;
    conn_opts.cleansession = 1;
    if ((rc = MQTTAsync_connect(client, amp;conn_opts)) != MQTTASYNC_SUCCESS)
    {
        printf("Failed to start connect, return code %dn", rc);
        finished = 1;
    }
}


void onDisconnect(void* context, MQTTAsync_successData* response)
{
    printf("Successful disconnectionn");
    finished = 1;
}


void onSend(void* context, MQTTAsync_successData* response)
{
    MQTTAsync client = (MQTTAsync)context;
    MQTTAsync_disconnectOptions opts = MQTTAsync_disconnectOptions_initializer;
    int rc;

    printf("Message with token value %d delivery confirmedn", response->token);

    opts.onSuccess = onDisconnect;
    opts.context = client;

    if ((rc = MQTTAsync_disconnect(client, amp;opts)) != MQTTASYNC_SUCCESS)
    {
        printf("Failed to start sendMessage, return code %dn", rc);
        exit(EXIT_FAILURE);
    }
}


void onConnectFailure(void* context, MQTTAsync_failureData* response)
{
    printf("Connect failed, rc %dn", response ? response->code : 0);
    finished = 1;
}


void onConnect(void* context, MQTTAsync_successData* response)
{
    MQTTAsync client = (MQTTAsync)context;
    MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
    MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
    int rc;

    printf("Successful connectionn");
    
    opts.onSuccess = onSend; // -> something like onHoldConnect???
    opts.context = client;

    pubmsg.payload = SendMessageArray; //PAYLOAD;
    pubmsg.payloadlen = (int)strlen(SendMessageArray);
    pubmsg.qos = QOS;
    pubmsg.retained = 0;
    deliveredtoken = 0;

    if ((rc = MQTTAsync_sendMessage(client, TOPIC, amp;pubmsg, amp;opts)) != MQTTASYNC_SUCCESS)
    {
        printf("Failed to start sendMessage, return code %dn", rc);
        exit(EXIT_FAILURE);
    }
}


int main(int argc, char* argv[])
{
    MQTTAsync client;
    MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
    int rc;

    MQTTAsync_create(amp;client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL);

    MQTTAsync_setCallbacks(client, NULL, connlost, NULL, NULL);

    conn_opts.keepAliveInterval = 20;
    conn_opts.cleansession = 1;
    conn_opts.onSuccess = onConnect;
    conn_opts.onFailure = onConnectFailure;
    conn_opts.context = client;
    if ((rc = MQTTAsync_connect(client, amp;conn_opts)) != MQTTASYNC_SUCCESS)
    {
        printf("Failed to start connect, return code %dn", rc);
        exit(EXIT_FAILURE);
    }

    printf("Waiting for publication of %sn"
         "on topic %s for client with ClientID: %sn",
         SendMessageArray, TOPIC, CLIENTID);
    while (!finished)
        #if defined(WIN32)
            Sleep(100);
        #else
            usleep(10000L);
        #endif

    MQTTAsync_destroy(amp;client);
    return rc;
}
  

Вот исходный код:http://gitlab.witium.com.cn/eclipse/paho.mqtt.c/blob/master/src/samples/MQTTAsync_publish.c

Мне почему-то нужно постоянное соединение с брокером, отправляющим все новые сообщения. Спасибо за вашу помощь.

РЕДАКТИРОВАТЬ: Хорошо, что я сделал сейчас, это (1) сохранить соединение, не отключаясь после отправки:

 void onHoldConnect(void* context, MQTTAsync_successData* response)
{
    printf("Holding connectionn");
}

void onConnect(void* context, MQTTAsync_successData* response)
{
    MQTTAsync client = (MQTTAsync)context;
    MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
    MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
    int rc;

    printf("Successful connectionn");
    opts.onSuccess = onHoldConnect; //onSend;
    opts.onFailure = onSendFailure;
    opts.context = client;
    pubmsg.payload = SendMessageArray; //PAYLOAD;
    pubmsg.payloadlen = (int)strlen(SendMessageArray); 
    pubmsg.qos = QOS;
    pubmsg.retained = 0;

    if ((rc = MQTTAsync_sendMessage(client, TOPIC, amp;pubmsg, amp;opts)) != MQTTASYNC_SUCCESS)
    {
      printf("Failed to start sendMessage, return code %dn", rc);
      exit(EXIT_FAILURE);
    }
}
  

и (2) запустите функцию OnConnect в цикле while с глобальным bool, если SendMessageArray был изменен:

 while (!finished) {
    if (sendFlag == true) {
      sendFlag = false;
      onConnect(client, amp;conn_opts);
    }
    #if defined(_WIN32)
        Sleep(100);
    #else
        usleep(10000L);
    #endif
}
  

Является ли это правильным способом решения проблемы? Я просто снова использовал (client, amp;conn_opts) в качестве параметров.

Комментарии:

1. Я тоже боролся с этим, сработало ли это в конечном итоге у вас?