#javascript #node.js #redis
#javascript #node.js #redis
Вопрос:
У меня есть приложение узла, получающее запросы на допуск записей к другому приложению. Я пытаюсь решить проблему, при которой целевое приложение может обрабатывать только 1 запрос / 50 мс, в то время как приложение узла часто получает запросы каждые 30 мс. Я попытался реализовать этот пример, используя bee-queue и redis, чтобы все очереди использовали общий глобальный предел параллелизма. В приведенном ниже тестовом примере быстрые запросы (с интервалом 40 мс) должны обрабатываться каждые 5 секунд, но он по-прежнему ведет себя так, как будто два потока работают изолированно и обрабатывают все без каких-либо задержек.
Вывод:
// app launched, messages received back-to-back
node-app_1 | 2021-01-14T18:27:29: PM2 log: Launching in no daemon mode
node-app_1 | 2021-01-14T18:27:29: PM2 log: App [app:0] starting in -fork mode-
node-app_1 | 2021-01-14T18:27:29: PM2 log: App [app:0] online
node-app_1 | tcp interface listening on 8888
node-app_1 | Message Event: A04 (timestamp: 1610668454624)
node-app_1 | ************sending ack****************
node-app_1 | Message Event: A04 (timestamp: 1610668454653)
node-app_1 | ************sending ack****************
// message one queued, processed
node-app_1 | 901999 job published to queue
node-app_1 | 901999 limiter available, consuming job
node-app_1 | 901999 admission job triggered
// message two queued, processed
node-app_1 | 901028 job published to queue
node-app_1 | 901028 limiter available, consuming job
node-app_1 | 901028 admission job triggered
// jobs complete, records created
node-app_1 | 901999 creating new record: 205
node-app_1 | 901028 creating new record: 205
В случае успеха он примет первый как 205, а второй как 206. Ограничитель должен задерживать второй запрос на допуск («Доступен ограничитель 901028, запускающий допуск») до тех пор, пока не будет принят первый («901028 создает новую запись»).).
Вот строка, которая ставит задание в очередь:
const { queueJob } = require('../redcap_queue');
...
queueJob(data)
.then(() => console.log( `${data.visit_number} job published to queue` ))
.catch(() => console.log( "Record could not be admitted" ));
Обработчик очереди:
const Queue = require('bee-queue');
var redcapAPI = require('./redcapAPI.js');
var limiter = require('./bottleneck');
const options = {
removeOnSuccess: true,
redis: {
host: <redis_ip>,
port: 6379,
},
}
const redcapQueue = new Queue('redcap', options);
// Publisher
const queueJob = (data) => {
return redcapQueue.createJob(data).save();
};
// Consumer
redcapQueue.on('ready', async () => {
// wait for limiter
await limiter.limiter.schedule(async () => {
console.log(`${job.data.visit_number} limiter available, consuming job`);
// process job
redcapQueue.process(async job => {
redcapAPI.importNewRecord(job.data);
console.log(`${job.data.visit_number} admission job triggered`);
done();
})
})
})
module.exports.queueJob = queueJob;
Вот моя реализация узкого места (адаптирована из примера выше)
var Bottleneck = require("bottleneck");
const limiter = new Bottleneck({
id: 'my_global_limiter',
datastore: 'redis',
clearDatastore: false,
clientOptions: {
host: <redis_ip>,
port: 6379,
},
maxConcurrent: 1,
mintime: 5000
});
module.exports.limiter = limiter;