Временная пауза RabbitMQ consummer

#go #rabbitmq

#Вперед #rabbitmq

Вопрос:

Я пишу consumer для RabbitMQ с помощью Go, который должен на некоторое время приостановить использование сообщений, а затем восстановиться, чтобы снова использовать сообщения из очереди. Во время чтения документации https://godoc.org/github.com/streadway/amqp Я не смог определить механизм, который мне нужно реализовать в моем коде.

Возможно ли это сделать? Есть пример?

Фрагмент моего кода:

 rabbitMQMessages, err = ch.Consume(
        "TestQ",
        "testConsumer",
        false,
        true,
        false,
        false,
        nil,
    )
    failOnError(err, "Failed to register a consumer")

    forever := make(chan bool)

    go func() {
        select {
        case d := <-rabbitMQMessages: // Cheking if messge was recieved
            log.Printf("Received a message: %s", d.Body)
            dotcount := bytes.Count(d.Body, []byte("."))

            err = ch.Flow(false) // Returns error: Exception (540) Reason: "NOT_IMPLEMENTED - active=false
            failOnError(err, "Failed to close channel")

            t := time.Duration(dotcount)
            time.Sleep(t * time.Second)
            log.Printf("Done")

            err = ch.Flow(true)

            d.Ack(false)
        default:
            log.Println("Default section")
        }
    }()

    log.Printf(" [*] Waiting for messages. To exit press CTRL C")
    <-forever
  

Комментарии:

1. Я никогда не использовал эту библиотеку, но простой поиск в связанных документах для «pause» показывает метод Flow . Разве это не то, что вам нужно?

2. @Flimzy Я знал об этом методе. К сожалению, он этого не делает, o я не знаю, как это использовать. Я получаю сообщение об ошибке, как только пытаюсь приостановить его. Exception (540) Reason: "NOT_IMPLEMENTED - active=false"

Ответ №1:

Я смог с этим разобраться. Мне нужно Close подключиться, а затем снова открыть его. Это предотвращает предварительное чтение сообщений. Не уверен, что это правильный путь, но у меня это сработало. Добавляю фрагмент моего тестового кода.

 func main() {
    var rabbitMQMessages <-chan amqp.Delivery
    var err error
    var rabbitMQ RabbitMQ

    err = rabbitMQ.dial()
    failOnError(err, "Failed to connect to RabbitMQ")
    defer rabbitMQ.Close()

    err = rabbitMQ.setUpChannel()
    failOnError(err, "Failed to open a channel")

    err = rabbitMQ.Consumme()
    failOnError(err, "Failed to consume")

    log.Printf(" [*] Waiting for messages. To exit press CTRL C")

    rabbitMQMessages = rabbitMQ.GetChan()

    for {
        select {
        case d, ok := <-rabbitMQMessages: // Cheking if messge was recieved
            log.Printf("Chan status at start of function %t", ok)

            if !ok {
                err = rabbitMQ.setUpChannel()
                failOnError(err, "Unable to open channel")
                defer rabbitMQ.Close()

                err = rabbitMQ.Consumme()
                failOnError(err, "Recover. Failed to register a consumer")

                rabbitMQMessages = rabbitMQ.GetChan()

                continue
            }

            log.Printf("Chan status at later of function %t", ok)

            log.Printf("Received a message: %s", d.Body)
            dotcount := bytes.Count(d.Body, []byte("."))
            d.Ack(false)

            err = rabbitMQ.CloseChannel()
            failOnError(err, "Failed to close channel")
            t := time.Duration(dotcount)
            time.Sleep(t * time.Second)
            log.Printf("Done")
        }
    }

}

  

Ответ №2:

Вам следует отменить прием, затем повторно запустить ch.Consume в то время, когда вы хотите возобновить прием сообщений.


ПРИМЕЧАНИЕ: команда RabbitMQ отслеживает rabbitmq-users список рассылки и лишь иногда отвечает на вопросы в StackOverflow.