Невозможно установить заголовки после их отправки клиенту. Кролик

#javascript #node.js #express #rabbitmq #microservices

Вопрос:

У меня есть приложение NodeJS. Я использую rabbitmq для подключения между микросервисами. Я получаю: Не удается установить заголовки после их отправки клиенту. Когда я отправляю HTTP-запрос в первый раз, я получаю свои данные правильно, но второй запрос вызывает эту ошибку. Почему это происходит? Вот мой пользовательский сервис:

 const connect = async () => {
    try {
        const amqpServer = "amqp://localhost:5672"
        connection = await amqp.connect(amqpServer)
        channel = await connection.createChannel()
        await channel.assertQueue(usersQueue)
        channel.consume(usersQueue, msg => {
            const token = JSON.parse(msg.content.toString()).token
            channel.ack(msg)
            if (token !== undefined amp;amp; token !== null) {
                User.findOne({token: token}).exec((err, user) => {
                    if (err) {
                        return channel.sendToQueue(usersQueue, Buffer.from(JSON.stringify(err)))
                    } else {
                        return channel.sendToQueue(usersQueue, Buffer.from(JSON.stringify(user)))
                    }
                })
            }
        })
    } catch (err) {
        console.log(err)
    }
}
connect()
 

Вот моя история-сервис:

 const connect = async () => {
    try {
        const amqpServer = "amqp://localhost:5672"
        connection = await amqp.connect(amqpServer)

        channel = await connection.createChannel()
        await channel.assertQueue(historyQueue)

        channel.consume(historyQueue, msg => {
            const audioId = JSON.parse(msg.content.toString()).audioId
            channel.ack(msg)
            if (audioId !== undefined amp;amp; audioId !== null) {
                History.find({audio: audioId}).exec((err, history) => {
                    if (err) {
                        return channel.sendToQueue(historyQueue, Buffer.from(JSON.stringify({err, status: false})))
                    } else {
                        return channel.sendToQueue(historyQueue, Buffer.from(JSON.stringify({history, status: true})))
                    }
                })
            }
        })
    } catch (err) {
        console.log(err)
    }
}
connect()
 

Вот мой аудио-сервис с api:

 const connect = async () => {
    try {
        const amqpServer = "amqp://localhost:5672"
        connection = await amqp.connect(amqpServer)

        channel = await connection.createChannel()
        await channel.assertQueue(usersQueue)
        await channel.assertQueue(historyQueue)
    } catch (err) {
        console.log(err)
    }
}
connect()
app.get("/get/analytics", (req, res) => {
    const token = {token: req.headers.token}
    if (req.headers.token !== undefined) {
        channel.sendToQueue(usersQueue, Buffer.from(JSON.stringify(token)))
        channel.consume(usersQueue, userMsg => {
            const writer = JSON.parse(userMsg.content.toString())
            channel.ack(userMsg)
            if (writer._id != null) {
                Audio.findOne({_id: req.headers.id, writer: writer._id, status: 10}).exec((err, audio) => {
                    if (err) {
                        return res.status(400).send(err.message)
                    } else {
                        channel.sendToQueue(historyQueue, Buffer.from(JSON.stringify({audioId: audio._id})))
                        channel.consume(historyQueue, historyMsg => {
                            channel.ack(historyMsg)
                            const history = JSON.parse(historyMsg.content.toString()).history
                            const noErr = JSON.parse(historyMsg.content.toString()).status
                            const errMessage = JSON.parse(historyMsg.content.toString()).err

                            if (noErr) {
                                res.status(200).json({audio, history})
                            } else {
                                res.status(500).json(errMessage)
                            }
                        })
                    }
                })
            } else {
                return res.status(403).json("Token is false")
            }
        })
    }
});
 

Комментарии:

1. Какой именно запрос вызывает ошибку в заголовках? И почему вы показываете три разных набора кода подключения? Какое это имеет отношение к вопросу?

2. @jfriend00, это три разных файла.

3. Какую библиотеку RabbitMQ вы используете? Пожалуйста, разместите ссылку на модуль и документацию.

4. @jfriend00, получить запрос api

5. Пожалуйста, прочтите оба моих вопроса еще раз. Ты не ответил ни на один из них.

Ответ №1:

Основываясь на некоторой документации RabbitMQ, которую я нашел, похоже, что channel.consume() вызов настраивает потребителя для сообщений на этом канале, и этот потребитель длится так же долго, как и канал. Итак, при первом /get/analytics попадании вы настраиваете потребителя для этого канала, а затем при втором попадании по этому маршруту вы настраиваете ДРУГОГО потребителя для того же канала, но первый все еще активен. Таким образом, когда сообщения поступают на этот канал, у вас будут дубликаты потребителей, обрабатывающих их, и, таким образом, вы в конечном итоге попытаетесь отправить ответы на старые запросы (от исходного потребителя, который все еще активен), и, следовательно, получите сообщение об ошибке.

Я совсем ничего не понимаю, что именно вы пытаетесь сделать здесь, но вы можете либо .cancel() потребителю после того как вы закончите с ним, чтобы не остаться вокруг и по-прежнему получать будущие сообщения, или вы можете удалить создании потребительского изнутри вашему запросу обработчик только один когда-либо созданных и почему вашему запросу обработчик будет получать уведомления от потребителя у вас есть.

Примечание: Мне кажется, что эта общая структура будет зависеть от условий гонки всякий раз /get/analytics , когда в игре одновременно задействовано несколько обработчиков маршрутов, поскольку невозможно определить, какой запрос, находящийся в игре, должен получать какие сообщения из очереди.


Кроме того, обратите внимание, что, когда ваше состояние:

  if (req.headers.token !== undefined) {
 

не выполнено, то вы не отправляете никакого ответа на запрос. Вы должны отправлять какой-то ответ на каждый запрос, возможно, захотите отправить код статуса 4xx, возможно, 401, если отсутствующий токен означает, что они не авторизованы.


Для дальнейшей отладки вы можете добавить некоторые полезные записи в журнал:

 const cntr = 0;
app.get("/get/analytics", (req, res) => {
    // log which request we're starting and keep a localCntr
    const localCntr = cntr  ;
    console.log(`Start of request #${localCntr}`);

    const token = {token: req.headers.token}
    if (req.headers.token !== undefined) {
        channel.sendToQueue(usersQueue, Buffer.from(JSON.stringify(token)))
        channel.consume(usersQueue, userMsg => {
            console.log(`${localCntr}: Getting channel.consume() callback on usersQueue`);
            const writer = JSON.parse(userMsg.content.toString())
            channel.ack(userMsg)
            if (writer._id != null) {
                Audio.findOne({_id: req.headers.id, writer: writer._id, status: 10}).exec((err, audio) => {
                    console.log(`${localCntr}: Got Audio.findOne() result`);
                    if (err) {
                        return res.status(400).send(err.message)
                    } else {
                        channel.sendToQueue(historyQueue, Buffer.from(JSON.stringify({audioId: audio._id})))
                        channel.consume(historyQueue, historyMsg => {
                            console.log(`${localCntr}: Getting channel.consume() callback on historyQueue`);
                            channel.ack(historyMsg)
                            const history = JSON.parse(historyMsg.content.toString()).history
                            const noErr = JSON.parse(historyMsg.content.toString()).status
                            const errMessage = JSON.parse(historyMsg.content.toString()).err

                            console.log(`${localCntr}: Sending response`);
                            if (noErr) {
                                res.status(200).json({audio, history})
                            } else {
                                res.status(500).json(errMessage)
                            }
                        })
                    }
                })
            } else {
                return res.status(403).json("Token is false")
            }
        })
    }
});
 

Комментарии:

1. Привет, это все еще не работает. Я использую его вот так — channel.consume(audioHistoryQueue, historyMsg => {channel.ack(historyMsg) channel.cancel(audioHistoryQueue)

2. @DaniilAndreev — Я не знаю API, с которым вы работаете. Я попросил ссылку на документ для любой библиотеки, которую вы используете, но вы этого не предоставили, поэтому на данный момент я не могу предложить больше.

3. Я использую amqplib. Вот ссылка на документы — squaremobius.net/amqp.node/channel_api.html

4. @DaniilAndreev — Я добавил кучу журналов отладки к примеру в своем ответе. Это регистрирует, какие именно обратные вызовы вы получаете, из какого места и с какого запроса # они поступили. Эти журналы, вероятно, сообщат вам, откуда поступают дополнительные обратные вызовы. При вставке этого журнала я вижу два отдельных вызова channel.consume() . Один на том, historyQueue а usersQueue другой на том . Вам придется сделать .cancel() и то, и другое, когда вы закончите с данным запросом, чтобы избежать получения повторяющихся уведомлений о будущих событиях.

5. @DaniilAndreev — Ключевая часть этих отладочных сообщений заключается console.log(`${localCntr}: Some other text `); в том, что каждое отладочное сообщение помечается порядковым номером, указывающим, от какого http-запроса отладочное сообщение. Я предполагаю, что вы видите отладочные сообщения от старых запросов, которые все еще встречаются, и это то, что вызывает ошибки.