#javascript #node.js #express #rabbitmq #microservices
Вопрос:
У меня есть приложение NodeJS. Я использую rabbitmq для подключения между микросервисами. Я получаю: Не удается установить заголовки после их отправки клиенту. Когда я отправляю HTTP-запрос в первый раз, я получаю свои данные правильно, но второй запрос вызывает эту ошибку. Почему это происходит? Вот мой пользовательский сервис:
const connect = async () => {
try {
const amqpServer = "amqp://localhost:5672"
connection = await amqp.connect(amqpServer)
channel = await connection.createChannel()
await channel.assertQueue(usersQueue)
channel.consume(usersQueue, msg => {
const token = JSON.parse(msg.content.toString()).token
channel.ack(msg)
if (token !== undefined amp;amp; token !== null) {
User.findOne({token: token}).exec((err, user) => {
if (err) {
return channel.sendToQueue(usersQueue, Buffer.from(JSON.stringify(err)))
} else {
return channel.sendToQueue(usersQueue, Buffer.from(JSON.stringify(user)))
}
})
}
})
} catch (err) {
console.log(err)
}
}
connect()
Вот моя история-сервис:
const connect = async () => {
try {
const amqpServer = "amqp://localhost:5672"
connection = await amqp.connect(amqpServer)
channel = await connection.createChannel()
await channel.assertQueue(historyQueue)
channel.consume(historyQueue, msg => {
const audioId = JSON.parse(msg.content.toString()).audioId
channel.ack(msg)
if (audioId !== undefined amp;amp; audioId !== null) {
History.find({audio: audioId}).exec((err, history) => {
if (err) {
return channel.sendToQueue(historyQueue, Buffer.from(JSON.stringify({err, status: false})))
} else {
return channel.sendToQueue(historyQueue, Buffer.from(JSON.stringify({history, status: true})))
}
})
}
})
} catch (err) {
console.log(err)
}
}
connect()
Вот мой аудио-сервис с api:
const connect = async () => {
try {
const amqpServer = "amqp://localhost:5672"
connection = await amqp.connect(amqpServer)
channel = await connection.createChannel()
await channel.assertQueue(usersQueue)
await channel.assertQueue(historyQueue)
} catch (err) {
console.log(err)
}
}
connect()
app.get("/get/analytics", (req, res) => {
const token = {token: req.headers.token}
if (req.headers.token !== undefined) {
channel.sendToQueue(usersQueue, Buffer.from(JSON.stringify(token)))
channel.consume(usersQueue, userMsg => {
const writer = JSON.parse(userMsg.content.toString())
channel.ack(userMsg)
if (writer._id != null) {
Audio.findOne({_id: req.headers.id, writer: writer._id, status: 10}).exec((err, audio) => {
if (err) {
return res.status(400).send(err.message)
} else {
channel.sendToQueue(historyQueue, Buffer.from(JSON.stringify({audioId: audio._id})))
channel.consume(historyQueue, historyMsg => {
channel.ack(historyMsg)
const history = JSON.parse(historyMsg.content.toString()).history
const noErr = JSON.parse(historyMsg.content.toString()).status
const errMessage = JSON.parse(historyMsg.content.toString()).err
if (noErr) {
res.status(200).json({audio, history})
} else {
res.status(500).json(errMessage)
}
})
}
})
} else {
return res.status(403).json("Token is false")
}
})
}
});
Комментарии:
1. Какой именно запрос вызывает ошибку в заголовках? И почему вы показываете три разных набора кода подключения? Какое это имеет отношение к вопросу?
2. @jfriend00, это три разных файла.
3. Какую библиотеку RabbitMQ вы используете? Пожалуйста, разместите ссылку на модуль и документацию.
4. @jfriend00, получить запрос api
5. Пожалуйста, прочтите оба моих вопроса еще раз. Ты не ответил ни на один из них.
Ответ №1:
Основываясь на некоторой документации RabbitMQ, которую я нашел, похоже, что channel.consume()
вызов настраивает потребителя для сообщений на этом канале, и этот потребитель длится так же долго, как и канал. Итак, при первом /get/analytics
попадании вы настраиваете потребителя для этого канала, а затем при втором попадании по этому маршруту вы настраиваете ДРУГОГО потребителя для того же канала, но первый все еще активен. Таким образом, когда сообщения поступают на этот канал, у вас будут дубликаты потребителей, обрабатывающих их, и, таким образом, вы в конечном итоге попытаетесь отправить ответы на старые запросы (от исходного потребителя, который все еще активен), и, следовательно, получите сообщение об ошибке.
Я совсем ничего не понимаю, что именно вы пытаетесь сделать здесь, но вы можете либо .cancel()
потребителю после того как вы закончите с ним, чтобы не остаться вокруг и по-прежнему получать будущие сообщения, или вы можете удалить создании потребительского изнутри вашему запросу обработчик только один когда-либо созданных и почему вашему запросу обработчик будет получать уведомления от потребителя у вас есть.
Примечание: Мне кажется, что эта общая структура будет зависеть от условий гонки всякий раз /get/analytics
, когда в игре одновременно задействовано несколько обработчиков маршрутов, поскольку невозможно определить, какой запрос, находящийся в игре, должен получать какие сообщения из очереди.
Кроме того, обратите внимание, что, когда ваше состояние:
if (req.headers.token !== undefined) {
не выполнено, то вы не отправляете никакого ответа на запрос. Вы должны отправлять какой-то ответ на каждый запрос, возможно, захотите отправить код статуса 4xx, возможно, 401, если отсутствующий токен означает, что они не авторизованы.
Для дальнейшей отладки вы можете добавить некоторые полезные записи в журнал:
const cntr = 0;
app.get("/get/analytics", (req, res) => {
// log which request we're starting and keep a localCntr
const localCntr = cntr ;
console.log(`Start of request #${localCntr}`);
const token = {token: req.headers.token}
if (req.headers.token !== undefined) {
channel.sendToQueue(usersQueue, Buffer.from(JSON.stringify(token)))
channel.consume(usersQueue, userMsg => {
console.log(`${localCntr}: Getting channel.consume() callback on usersQueue`);
const writer = JSON.parse(userMsg.content.toString())
channel.ack(userMsg)
if (writer._id != null) {
Audio.findOne({_id: req.headers.id, writer: writer._id, status: 10}).exec((err, audio) => {
console.log(`${localCntr}: Got Audio.findOne() result`);
if (err) {
return res.status(400).send(err.message)
} else {
channel.sendToQueue(historyQueue, Buffer.from(JSON.stringify({audioId: audio._id})))
channel.consume(historyQueue, historyMsg => {
console.log(`${localCntr}: Getting channel.consume() callback on historyQueue`);
channel.ack(historyMsg)
const history = JSON.parse(historyMsg.content.toString()).history
const noErr = JSON.parse(historyMsg.content.toString()).status
const errMessage = JSON.parse(historyMsg.content.toString()).err
console.log(`${localCntr}: Sending response`);
if (noErr) {
res.status(200).json({audio, history})
} else {
res.status(500).json(errMessage)
}
})
}
})
} else {
return res.status(403).json("Token is false")
}
})
}
});
Комментарии:
1. Привет, это все еще не работает. Я использую его вот так —
channel.consume(audioHistoryQueue, historyMsg => {channel.ack(historyMsg) channel.cancel(audioHistoryQueue)
2. @DaniilAndreev — Я не знаю API, с которым вы работаете. Я попросил ссылку на документ для любой библиотеки, которую вы используете, но вы этого не предоставили, поэтому на данный момент я не могу предложить больше.
3. Я использую amqplib. Вот ссылка на документы — squaremobius.net/amqp.node/channel_api.html
4. @DaniilAndreev — Я добавил кучу журналов отладки к примеру в своем ответе. Это регистрирует, какие именно обратные вызовы вы получаете, из какого места и с какого запроса # они поступили. Эти журналы, вероятно, сообщат вам, откуда поступают дополнительные обратные вызовы. При вставке этого журнала я вижу два отдельных вызова
channel.consume()
. Один на том,historyQueue
аusersQueue
другой на том . Вам придется сделать.cancel()
и то, и другое, когда вы закончите с данным запросом, чтобы избежать получения повторяющихся уведомлений о будущих событиях.5. @DaniilAndreev — Ключевая часть этих отладочных сообщений заключается
console.log(`${localCntr}: Some other text `);
в том, что каждое отладочное сообщение помечается порядковым номером, указывающим, от какого http-запроса отладочное сообщение. Я предполагаю, что вы видите отладочные сообщения от старых запросов, которые все еще встречаются, и это то, что вызывает ошибки.