#go #mqtt #mosquitto
#Вперед #mqtt #mosquitto
Вопрос:
Я вижу непоследовательную доставку сообщений с сохранением сообщений и qos = 2 на mosquitto. Есть ли что-то, что я делаю неправильно?
У меня есть простое тестовое приложение, которое регистрирует тему для использования с помощью ClientID=»receive-client», но сразу же отключается. Затем он подключается как ClientID=»send-client» и публикует 10 сообщений, «сообщение # 1» … «сообщение # 10». Затем отключается, ждет пять секунд и снова подключается для использования с помощью «receive-client» при печати и подсчете полученных сообщений.
Результат противоречив. Иногда я получаю 6 сообщений, иногда 8. Типичный результат выглядит примерно так:
WARN[0005] GOT A MESSAGE:message #1
WARN[0005] GOT A MESSAGE:message #2
WARN[0005] GOT A MESSAGE:message #3
WARN[0005] GOT A MESSAGE:message #4
WARN[0005] GOT A MESSAGE:message #5
WARN[0005] GOT A MESSAGE:message #6
WARN[0005] GOT A MESSAGE:message #7
WARN[0005] GOT A MESSAGE:message #8
WARN[0305] PAUSE
WARN[0605] received message count=8
В моей информации о версии указано 1.4.15. Мой mosquitto.conf:
pid_file /var/run/mosquitto.pid
persistence true
persistence_location /var/lib/mosquitto/
allow_anonymous false
password_file /etc/mosquitto/passwd
log_dest file /var/log/mosquitto/mosquitto.log
Изначально /var/lib/mosquitto/mosquitto.db не отображается, пока не будет выполнено несколько итераций. Мое тестовое приложение здесь:
import (
mqtt "github.com/eclipse/paho.mqtt.golang"
log "github.com/sirupsen/logrus"
"time"
)
var receivedMsg int
func Persist() {
const TOPIC = "test"
const URL = "tcp://localhost:1883"
const USERNAME = "myuser"
const PASSWORD = "mypassword"
defer printReceived()
options := mqtt.NewClientOptions().AddBroker(URL).SetUsername(USERNAME).SetPassword(PASSWORD)
options.SetCleanSession(false)
options.SetConnectRetry(true)
options.SetConnectRetryInterval(10 * time.Millisecond)
// register the receive client with broker / TOPIC
// to be sure the broker knows it needs to save our messages
// to deliver at a later time
options.SetClientID("receive-client")
client := mqtt.NewClient(options)
token := client.Connect()
token.Wait()
if token := client.Subscribe(TOPIC, 2, consume1); token.Wait() amp;amp; token.Error() != nil {
panic(token.Error())
}
client.Disconnect(0)
// connect with send client and send 10 messages
options.SetClientID("send-client")
client = mqtt.NewClient(options)
token = client.Connect()
token.Wait()
client.Publish(TOPIC, 2, false, "message #1")
client.Publish(TOPIC, 2, false, "message #2")
client.Publish(TOPIC, 2, false, "message #3")
client.Publish(TOPIC, 2, false, "message #4")
client.Publish(TOPIC, 2, false, "message #5")
client.Publish(TOPIC, 2, false, "message #6")
client.Publish(TOPIC, 2, false, "message #7")
client.Publish(TOPIC, 2, false, "message #8")
client.Publish(TOPIC, 2, false, "message #9")
client.Publish(TOPIC, 2, false, "message #10")
client.Disconnect(4)
time.Sleep(5* time.Second)
// subscribe again and try to retrieve the messages we missed
options.SetClientID("receive-client")
client = mqtt.NewClient(options)
token = client.Connect()
token.Wait()
if token := client.Subscribe(TOPIC, 2, consume2); token.Wait() amp;amp; token.Error() != nil {
panic(token.Error())
}
time.Sleep(300 * time.Second)
log.Warn("PAUSE")
time.Sleep(300 * time.Second)
}
func consume1(client mqtt.Client, msg mqtt.Message) {
receivedMsg
log.Warn("THIS SHOULD NOT BE CONSUMING ANY MESSAGES:", string(msg.Payload()))
}
func consume2(client mqtt.Client, msg mqtt.Message) {
receivedMsg
log.Warn("GOT A MESSAGE:", string(msg.Payload()))
}
func printReceived() {
log.Warn("received message count=", receivedMsg)
}
Ответ №1:
Публикация на QOS 2 — это многоступенчатый процесс, поэтому наиболее вероятной причиной является то, что вы отключаете публикующий клиент до того, как все сообщения фактически завершат публикацию брокеру. Вероятно, вам следует выполнить эту публикацию в цикле и использовать возвращенный токен из вызова to client.publish()
, чтобы дождаться его завершения, прежде чем отключать клиента.
например, как показано в примере:
//Publish 5 messages to /go-mqtt/sample at qos 1 and wait for the receipt
//from the server after sending each message
for i := 0; i < 5; i {
text := fmt.Sprintf("this is msg #%d!", i)
token := c.Publish("go-mqtt/sample", 0, false, text)
token.Wait()
}
Комментарии:
1. Конечно! Я сгенерировал это как простое представление сервера, и процедура публикации сервера (чтобы я мог публиковать асинхронно) не была защищена группой ожидания. Спасибо.