Обрабатывать асинхронные сообщения в рабочих потоках

#node.js #async-await #web-worker

#node.js #асинхронное ожидание #веб-рабочий

Вопрос:

Мы используем NodeJS experimental workers для выполнения некоторых задач с интенсивным использованием процессора. Эти задачи запускаются с помощью сообщений при parentPort передаче сообщений. Во время работы потоков им необходимо сохранять данные в базе данных, которая представляет собой асинхронную операцию, подкрепленную обещаниями.

Мы видим, что parentPort сообщения продолжают отправляться в функцию-обработчик, пока мы выполняем асинхронные операции.

Пример кода, который мы делаем:

 const { parentPort, Worker, isMainThread } = require('worker_threads');

if (isMainThread) {
  const worker = new Worker(__filename);

  const i = [1, 2, 3, 4, 5, 6, 7, 8, 9];
  for (const x of i) {
    worker.postMessage({ idx: x });
  }
} else {
  parentPort.on('message', async (value) => {
    await testAsync(value);
  });
}

async function testAsync(value) {
  return new Promise((resolve) => {
    console.log(`Starting wait for ${value.idx}`);
    setTimeout(() => {
      console.log(`Complete resolve for ${value.idx}`);
      resolve();

      if(value.idx == 9) {
        setTimeout(() => process.exit(0), 2000);
      }
    }, 500);
  });
}
  

В приведенном выше примере мы видим Starting wait for ... печать перед появлением любых Complete resolve ... сообщений. С async-await мы ожидали, что обработчик события будет ждать разрешенного обещания перед обработкой нового события. В реальном примере может произойти сбой подключения к базе данных, что приводит к возникновению исключения, поэтому мы хотим убедиться, что текущее сообщение было полностью обработано, прежде чем принимать новое.

Мы делаем здесь что-нибудь не так?

Если нет, то есть ли какой-либо способ достижения желаемой цели обработки события по порядку?

Ответ №1:

Кажется, что вы хотите ставить сообщения в очередь и обрабатывать только одну вещь за раз.

parentPort.on('message', () => {} является прослушивателем событий, когда событие запускается, оно не будет ждать, пока не будет выполнена предыдущая асинхронная операция внутри обратного вызова.

Итак, если вы запускаете 'message' тысячу раз, testAsync будет выполняться тысячу раз без ожидания.

Вам необходимо реализовать очередь в рабочем потоке и ограничить параллелизм. В NPM есть несколько пакетов очереди обещаний.

Я буду использовать p-queue в этом примере.

 const PQueue = require('p-queue');

const { parentPort, Worker, isMainThread } = require('worker_threads');

if (isMainThread) {
  const worker = new Worker(__filename);

  const i = [1, 2, 3, 4, 5, 6, 7, 8, 9];
  for (const x of i) {
    worker.postMessage({ idx: x });
  }
} else {

    const queue = new PQueue({ concurrency: 1 }); // set concurrency

    parentPort.on('message', value => {
      queue.add(() => testAsync(value));
    });
}

async function testAsync(value) {
  return new Promise((resolve) => {
    console.log(`Starting wait for ${value.idx}`);
    setTimeout(() => {
      console.log(`Complete resolve for ${value.idx}`);
      resolve();

      if(value.idx == 9) {
        setTimeout(() => process.exit(0), 2000);
      }
    }, 500);
  });
}
  

Теперь вывод будет:

 starting wait for 1
complete resolve for 1

starting wait for 2
complete resolve for 2

starting wait for N
complete resolve for N