#go #rabbitmq
#Вперед #rabbitmq
Вопрос:
Я пишу consumer для RabbitMQ с помощью Go, который должен на некоторое время приостановить использование сообщений, а затем восстановиться, чтобы снова использовать сообщения из очереди. Во время чтения документации https://godoc.org/github.com/streadway/amqp Я не смог определить механизм, который мне нужно реализовать в моем коде.
Возможно ли это сделать? Есть пример?
Фрагмент моего кода:
rabbitMQMessages, err = ch.Consume(
"TestQ",
"testConsumer",
false,
true,
false,
false,
nil,
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
select {
case d := <-rabbitMQMessages: // Cheking if messge was recieved
log.Printf("Received a message: %s", d.Body)
dotcount := bytes.Count(d.Body, []byte("."))
err = ch.Flow(false) // Returns error: Exception (540) Reason: "NOT_IMPLEMENTED - active=false
failOnError(err, "Failed to close channel")
t := time.Duration(dotcount)
time.Sleep(t * time.Second)
log.Printf("Done")
err = ch.Flow(true)
d.Ack(false)
default:
log.Println("Default section")
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL C")
<-forever
Комментарии:
1. Я никогда не использовал эту библиотеку, но простой поиск в связанных документах для «pause» показывает метод Flow . Разве это не то, что вам нужно?
2. @Flimzy Я знал об этом методе. К сожалению, он этого не делает, o я не знаю, как это использовать. Я получаю сообщение об ошибке, как только пытаюсь приостановить его.
Exception (540) Reason: "NOT_IMPLEMENTED - active=false"
Ответ №1:
Я смог с этим разобраться. Мне нужно Close
подключиться, а затем снова открыть его. Это предотвращает предварительное чтение сообщений. Не уверен, что это правильный путь, но у меня это сработало. Добавляю фрагмент моего тестового кода.
func main() {
var rabbitMQMessages <-chan amqp.Delivery
var err error
var rabbitMQ RabbitMQ
err = rabbitMQ.dial()
failOnError(err, "Failed to connect to RabbitMQ")
defer rabbitMQ.Close()
err = rabbitMQ.setUpChannel()
failOnError(err, "Failed to open a channel")
err = rabbitMQ.Consumme()
failOnError(err, "Failed to consume")
log.Printf(" [*] Waiting for messages. To exit press CTRL C")
rabbitMQMessages = rabbitMQ.GetChan()
for {
select {
case d, ok := <-rabbitMQMessages: // Cheking if messge was recieved
log.Printf("Chan status at start of function %t", ok)
if !ok {
err = rabbitMQ.setUpChannel()
failOnError(err, "Unable to open channel")
defer rabbitMQ.Close()
err = rabbitMQ.Consumme()
failOnError(err, "Recover. Failed to register a consumer")
rabbitMQMessages = rabbitMQ.GetChan()
continue
}
log.Printf("Chan status at later of function %t", ok)
log.Printf("Received a message: %s", d.Body)
dotcount := bytes.Count(d.Body, []byte("."))
d.Ack(false)
err = rabbitMQ.CloseChannel()
failOnError(err, "Failed to close channel")
t := time.Duration(dotcount)
time.Sleep(t * time.Second)
log.Printf("Done")
}
}
}
Ответ №2:
Вам следует отменить прием, затем повторно запустить ch.Consume
в то время, когда вы хотите возобновить прием сообщений.
ПРИМЕЧАНИЕ: команда RabbitMQ отслеживает rabbitmq-users
список рассылки и лишь иногда отвечает на вопросы в StackOverflow.