непоследовательное сохранение в mosquitto

#go #mqtt #mosquitto

#Вперед #mqtt #mosquitto

Вопрос:

Я вижу непоследовательную доставку сообщений с сохранением сообщений и qos = 2 на mosquitto. Есть ли что-то, что я делаю неправильно?

У меня есть простое тестовое приложение, которое регистрирует тему для использования с помощью ClientID=»receive-client», но сразу же отключается. Затем он подключается как ClientID=»send-client» и публикует 10 сообщений, «сообщение # 1» … «сообщение # 10». Затем отключается, ждет пять секунд и снова подключается для использования с помощью «receive-client» при печати и подсчете полученных сообщений.

Результат противоречив. Иногда я получаю 6 сообщений, иногда 8. Типичный результат выглядит примерно так:

 WARN[0005] GOT A MESSAGE:message #1                     
WARN[0005] GOT A MESSAGE:message #2                     
WARN[0005] GOT A MESSAGE:message #3                     
WARN[0005] GOT A MESSAGE:message #4                     
WARN[0005] GOT A MESSAGE:message #5                     
WARN[0005] GOT A MESSAGE:message #6                     
WARN[0005] GOT A MESSAGE:message #7                     
WARN[0005] GOT A MESSAGE:message #8                     
WARN[0305] PAUSE                                        
WARN[0605] received message count=8                     
  

В моей информации о версии указано 1.4.15. Мой mosquitto.conf:

 pid_file /var/run/mosquitto.pid

persistence true
persistence_location /var/lib/mosquitto/

allow_anonymous false
password_file /etc/mosquitto/passwd

log_dest file /var/log/mosquitto/mosquitto.log
  

Изначально /var/lib/mosquitto/mosquitto.db не отображается, пока не будет выполнено несколько итераций. Мое тестовое приложение здесь:

 import (
    mqtt "github.com/eclipse/paho.mqtt.golang"
    log "github.com/sirupsen/logrus"
    "time"
)

var receivedMsg int

func Persist() {
    const TOPIC = "test"
    const URL = "tcp://localhost:1883"
    const USERNAME = "myuser"
    const PASSWORD = "mypassword"

    defer printReceived()

    options := mqtt.NewClientOptions().AddBroker(URL).SetUsername(USERNAME).SetPassword(PASSWORD)
    options.SetCleanSession(false)
    options.SetConnectRetry(true)
    options.SetConnectRetryInterval(10 * time.Millisecond)

    // register the receive client with broker / TOPIC
    // to be sure the broker knows it needs to save our messages
    // to deliver at a later time
    options.SetClientID("receive-client")
    client := mqtt.NewClient(options)
    token := client.Connect()
    token.Wait()
    if token := client.Subscribe(TOPIC, 2, consume1); token.Wait() amp;amp; token.Error() != nil {
        panic(token.Error())
    }
    client.Disconnect(0)

    // connect with send client and send 10 messages
    options.SetClientID("send-client")
    client = mqtt.NewClient(options)
    token = client.Connect()
    token.Wait()

    client.Publish(TOPIC, 2, false, "message #1")
    client.Publish(TOPIC, 2, false, "message #2")
    client.Publish(TOPIC, 2, false, "message #3")
    client.Publish(TOPIC, 2, false, "message #4")
    client.Publish(TOPIC, 2, false, "message #5")
    client.Publish(TOPIC, 2, false, "message #6")
    client.Publish(TOPIC, 2, false, "message #7")
    client.Publish(TOPIC, 2, false, "message #8")
    client.Publish(TOPIC, 2, false, "message #9")
    client.Publish(TOPIC, 2, false, "message #10")
    client.Disconnect(4)
    time.Sleep(5* time.Second)

    // subscribe again and try to retrieve the messages we missed
    options.SetClientID("receive-client")
    client = mqtt.NewClient(options)
    token = client.Connect()
    token.Wait()

    if token := client.Subscribe(TOPIC, 2, consume2); token.Wait() amp;amp; token.Error() != nil {
        panic(token.Error())
    }

    time.Sleep(300 * time.Second)
    log.Warn("PAUSE")
    time.Sleep(300 * time.Second)
}

func consume1(client mqtt.Client, msg mqtt.Message) {
    receivedMsg  
    log.Warn("THIS SHOULD NOT BE CONSUMING ANY MESSAGES:", string(msg.Payload()))
}

func consume2(client mqtt.Client, msg mqtt.Message) {
    receivedMsg  
    log.Warn("GOT A MESSAGE:", string(msg.Payload()))
}

func printReceived() {
    log.Warn("received message count=", receivedMsg)
}
  

Ответ №1:

Публикация на QOS 2 — это многоступенчатый процесс, поэтому наиболее вероятной причиной является то, что вы отключаете публикующий клиент до того, как все сообщения фактически завершат публикацию брокеру. Вероятно, вам следует выполнить эту публикацию в цикле и использовать возвращенный токен из вызова to client.publish() , чтобы дождаться его завершения, прежде чем отключать клиента.

например, как показано в примере:

 //Publish 5 messages to /go-mqtt/sample at qos 1 and wait for the receipt
//from the server after sending each message
for i := 0; i < 5; i   {
  text := fmt.Sprintf("this is msg #%d!", i)
  token := c.Publish("go-mqtt/sample", 0, false, text)
  token.Wait()
}
  

Комментарии:

1. Конечно! Я сгенерировал это как простое представление сервера, и процедура публикации сервера (чтобы я мог публиковать асинхронно) не была защищена группой ожидания. Спасибо.