Как запретить bull queue автоматически перезапускать задания при перезапуске сервера

#javascript #node.js #queue #worker #bull-queue

#javascript #node.js #очередь #рабочий #bull-queue

Вопрос:

Я использую bull queue для обработки заданий.

Допустим, задание выполняется со статусом active , когда я перезапускаю свой сервер разработки. Когда рабочий скрипт запускается снова, процесс по-прежнему находится active в очереди, поэтому bull решает снова запустить рабочий процесс.

Это очень быстро приводит к сбоям, поскольку скрипт часто перезапускается во время разработки, поэтому многие процессы в конечном итоге запускаются и создают беспорядок. Все, что я хочу, это чтобы bull НЕ перезапускал эти задания при запуске сервера.

Вещи, которые я пробовал:

     let active_jobs = await queue.getJobs(['active']);
    active_jobs.forEach(async (active_job) => {
      await active_job.discard()
      await active_job.moveToFailed(new Error("Auto-killed during dev server restart"))
    })
  

Ничего из этого не работает. У кого-нибудь есть решение для достижения этой цели?

Ответ №1:

попробуйте

 queue.obliterate({force: true});
  

после этого очередь приостанавливается, поэтому ее необходимо перезапустить.

Комментарии:

1. это отлично работало с BullMQ

Ответ №2:

На самом деле это просто, если у вас есть необходимые функции. Вы могли бы реализовать некоторый код для полной очистки определенных очередей. Этот код должен находиться в месте, которое вызывается один раз при запуске вашего сервера. Таким образом, хорошим местом будет конструктор вашего класса (сервера-производителя). Таким образом, вы всегда начинаете с чистой очереди с нулевыми записями для целей разработки. Поэтому лучше всего обернуть вызов функции для очистки очередей в какой-либо оператор, который проверяет условия разработки.

Вы можете использовать что-то вроде следующего:

 const getKeys = async (q) => {
  const multi = q.multi();
  multi.keys('*');
  const keys = await multi.exec();
  return keys[0][1]
}

const filterQueueKeys = (q, keys) => {
  const prefix = `${q.keyPrefix}:${q.name}`;
  return keys.filter(k => k.includes(prefix));
}

const deleteKeys = async (q, keys) => {
  const multi = q.multi();
  keys.forEach(k => multi.del(k));
  await multi.exec();
}

const emptyQueue = async (q) => {  
  const keys = await getKeys(q);
  const queueKeys = filterQueueKeys(q, keys);
  await deleteKeys(q, queueKeys);
}

if (process.env === 'development') {
    emptyQueue(this.workerQueue).then(() => {
      console.log('QUEUE EMPTY!')
    })
}