#node.js #mongoose #orm #rabbitmq #node-amqplib
Вопрос:
У меня есть этот рабочий файл:
const { orm } = require('../../models');
const config = require('../../config');
const rabbitServices = require('../rabbitMQ');
const router = require('./listenerRouter');
(async () => {
// connect to mongoose db (work fine)
await orm.connect(config.db);
if (config.rabbitMQ.enabled) {
await rabbitServices.connect(config.rabbitMQ);
await rabbitServices.createListener(config.dataSync.queue, router);
}
console.log('[DATASYNC] Worker started');
})();
И у меня есть сервис rabbit (проблема здесь в функции addListener):
const rabbit = require('amqplib');
const SomeModel = require('../../models/SomeModel');
let rabbitConnection;
let channel;
const listeners = [];
const _createChannel = async () => {
let newChannel;
try {
newChannel = await rabbitConnection.createChannel();
channel = newChannel;
} catch (error) {
console.error('[AMQP]channel', error.message);
return;
}
newChannel.on('close', function () {
console.error('[AMQP] close chanel');
channel = null;
});
console.log('[AMQP] create chanel');
};
const addListener = async (queue, callback, options = {}) => {
const { ack } = options;
const removeMessage = (msg) => () => channel.ack(msg);
if (!channel) {
console.error('[AMQP] chanel is closed, Listener no created');
return;
}
await channel.assertQueue(queue, {
durable: false
});
// This db-query work fine
const seccessResult = await SomeModel.find({});
return await channel.consume(queue, async (msg) => {
if (msg !== null) {
// But this db-query just aborting whitout any message
const neverResult = await SomeModel.find({});
callback(msg, {removeMessage: removeMessage(msg)});
if (ack) { channel.ack(msg); }
}
});
};
const connect = async (options) => {
const {login, password, host, reconnect_time} = options;
const URL = `amqp://${login}:${password}@${host}`;
let conn = null;
try {
conn = await rabbit.connect(URL);
} catch (error) {
console.error('[AMQP]connect', error.message);
return setTimeout(connect, reconnect_time, options);
}
conn.on('error', function (err) {
if (err.message !== 'Connection closing') {
console.error('[AMQP] conn error', err.message);
}
});
conn.on('close', function () {
console.error('[AMQP] reconnecting');
rabbitConnection = null;
return setTimeout(connect, reconnect_time, options);
});
console.log(`RabbitMQ connection established on ${host}`);
rabbitConnection = conn;
await _createChannel();
if (listeners.length) {
await Promise.all(listeners.map(listener => addListener(
listener.queue, listener.callback, listener.options
)));
}
};
const sendMessage = async (queue, message) => {
if (!channel) {
console.error('[AMQP] chanel is closed, message not sended');
return;
}
await channel.assertQueue(queue);
await channel.sendToQueue(queue, Buffer.from(JSON.stringify(message)), {
persistent: true
});
console.log(`RabbitMQ send message to queue ${message}`);
};
const createListener = async (queue, callback, options = {}) => {
listeners.push({queue, callback, options});
return await addListener(queue, callback, options);
};
module.exports = {
connect,
sendMessage,
createListener
};
Я просто не могу понять, почему orm не работает внутри этого обратного вызова. Я провел весь день в поисках решения этой проблемы, но до сих пор не могу его найти. Как я могу добиться выполнения запросов к базе данных внутри прослушивателя?