#javascript #node.js #queue #bull.js
#javascript #node.js #очередь #bull.js
Вопрос:
При выполнении приведенного ниже кода выводится слишком много результатов.
Я подозреваю, что completed
событие прослушивает все предыдущие задания, выполненные в текущем экземпляре очереди.
Как я могу управлять завершенным событием, чтобы прослушивать только текущее завершение задания?
producer.js
Производитель создает задание с числовым идентификатором по умолчанию и прослушивает глобальное завершение, чтобы вернуть ответ, когда задание будет выполнено.
const BullQ = require('bull');
let bullQ = BullQ('my-first-queue', {
redis: {
host: process.env.REDISHOST || 'localhost',
password: process.env.REDISPASSWORD || ''
}
});
app.get('/search/:term', async (req, res) => {
const job = await bullQ.add({
searchTerm: req.params.term
});
// Listen to the global completion of the queue in order to return result.
bullQ.on('global:completed', (jobId, result) => {
// Check if id is a number without any additions
if (/^d $/.test(jobId) amp;amp; !res.headersSent) {
console.log(`Producer get: Job ${jobId} completed! Result: ${result}`);
res.json(`Job is completed with result: ${result}`);
}
});
});
consumer.js
У потребителя есть 2 роли.
- Использовать задания так, как это должно быть по книге
- Создавать новые задания на основе результата последнего задания.
const BullQ = require('bull');
let bullQ = BullQ('my-first-queue', {
redis: {
host: process.env.REDISHOST || 'localhost',
password: process.env.REDISPASSWORD || ''
}
});
bullQ.process((job, done) => {
// Simulate asynchronous server request.
setTimeout(async () => {
// Done the first job and return an answer to the producer after the timeout.
done(null, `Search result for ${job.data.searchTerm}`);
// next job run
if (counter < 10) {
// For the first run the id is just a number if not changed via the jobId in JobOpts,
// the next time the job id will be set to {{id}}_next_{{counter}} we need only the first number in order not to end with a long and not clear concatenated string.
let jobID = (/^d $/.test(job.id)) ? job.id : job.id.replace(/[^d].*/,'');
await createNextJob(jobID, counter);
}
}, 100);
});
// Create next job and add it to the queue.
// Listen to the completed jobs (locally)
const createNextJob = async (id, counter) => {
const nextJob = bullQ.add({
searchTerm: "Next job"
}, {
jobId: `${id}_next_${counter}`
});
await bullQ.on('completed', (job, result) => {
job.finished();
console.log(`Consumer(next): Job ${job.id} completed! Result: ${result}`);
});
};
Комментарии:
1. эй, попробуйте добавить
bullQ.on('global:failed', (jobid,data){console.log(jobid,data)});
в конце, потому что я обнаружил, что у меня была ошибка в моей функции задания, о которой не сообщалось, вызывающая эту проблему
Ответ №1:
Вы можете await job.finished()
получить свой результат, специфичный для job
объекта, возвращаемого queue.add()
.
Вот простой, доступный для выполнения пример без выражения для иллюстрации:
const Queue = require("bull"); // "bull": "^4.8.5"
const sleep = (ms=1000) =>
new Promise(resolve => setTimeout(resolve, ms))
;
const queue = new Queue("test", process.env.REDIS_URL);
queue.process(4, async job => {
await sleep(job.data.seconds * 1000); // waste time
return Promise.resolve(`job ${job.id} complete!`);
});
(async () => {
const job = await queue.add({seconds: 5});
const result = await job.finished();
console.log(result); // => job 42 complete!
await queue.close();
})();
С помощью Express:
const Queue = require("bull");
const express = require("express");
const sleep = (ms=1000) =>
new Promise(resolve => setTimeout(resolve, ms))
;
const queue = new Queue("test", process.env.REDIS_URL);
queue.process(4, async job => {
await sleep(job.data.seconds * 1000);
return Promise.resolve(`job ${job.id} complete!`);
});
const app = express();
app
.set("port", process.env.PORT || 5000)
.get("/", async (req, res) => {
try {
const job = await queue.add({
seconds: Math.abs( req.query.seconds) || 10,
});
const result = await job.finished();
res.send(result);
}
catch (err) {
res.status(500).send({error: err.message});
}
})
.listen(app.get("port"), () =>
console.log("app running on port", app.get("port"))
)
;
Пример запуска:
$ curl localhost:5000?seconds=2
job 42 complete!
Комментарии:
1. Большое вам спасибо! Не уверен, почему я не видел
job.finished()
адекватно документированных данных в другом месте. Это гораздо лучший подход для определенных ситуаций!2. Эй, я в замешательстве, почему вы хотите ожидать фоновое задание? Разве цель задания не в том, чтобы освободить поток узла и сократить время отклика? ожидание задания приведет к тому, что время отклика будет таким же, как если бы это не было фоновым заданием.
3. Конечно, время завершения задания будет таким же, но поток будет свободен для работы с другими запросами по мере завершения фоновой задачи. Вот почему это асинхронно. Для некоторых фоновых заданий допустимо задерживать HTTP-ответ или отправлять сообщение сокета или SSE по завершении из той же области экспресс-обработчика. Это полностью зависит от вашего варианта использования; мой пример придуман, чтобы проиллюстрировать инструмент.