Обещания Nodejs — составьте конвейер из асинхронных функций в последовательности с поддержкой раннего выхода

#node.js #promise #pipeline

Вопрос:

Недавно я играл с композицией асинхронных функций в nodejs и понял, что было бы неплохо, если бы я мог обернуть их в конвейер, чтобы использовать их в другом месте. Каждый вывод предыдущей асинхронной функции будет вводиться для следующей. Для базовой реализации я использовал такой код:

pipeline.js

 
function run(tasks, input) {
  let context = input;

  const reducer = (acc, task) => acc.then((result) => {
    context = resu<
    return task(context);
  };

  return tasks.reduce(reducer, Promise.resolve(context));
}

 

tasks.js

 /* do nothing, return input */
const task1 = async (context) => {
  console.log(`previous output: ${context}`);
  return context;
}

/* do nothing, return input */
const task2 = async (context) => {
  console.log(`previous output: ${context}`);
  return context;
}

...

 

test.js

 /* import tasks -> tas1, task2, ... */
const tasks = require('tasks.js');
const pipeline = require('pipeline.js');

pipeline.run(tasks, null)
  .then((output) => console.log('done')
  .catch((err) => console.error(err));
 

где :

  • pipeline.js : реализация трубопровода
  • tasks.js : определения функций, из которых я могу составить конвейер
  • test.js : код для проверки всего

Это основной сценарий, который будет соответствовать моим потребностям большую часть времени. Тем не менее, я пришел к выводу, что мне, скорее всего, потребуется ранний выход (то есть раннее завершение конвейера), когда будет выполнено определенное условие. Я не мог найти способ чисто достичь этого, и единственное, что сработало для меня, — это отклонить обещание с некоторой ошибкой, которая будет действовать как автоматический выключатель. Это подход, который я использовал:

pipeline.js

 function run(tasks, input) {
  let context = input;

  /* added cancel closure */
  const cancel = () => {
    throw new PipelineCancelError();
  }

  const reducer = (acc, task) => acc.then((result) => {
    context = resu<
    return task(context);
  };

  return tasks.reduce(reducer, Promise.resolve(context));
}
 

таким образом, мои задачи получат дополнительную функцию в качестве аргумента, чтобы я мог отменить какой-то этап конвейера :

tasks.js

 /* do nothing, return input */
const task3 = async (context, cancel) => {
  console.log(`previous output: ${context}`);
  if (someCondition) {
    /* cancel pipeline */
    cancel();
  }
  return context;
}
 

Мой вопрос: существует ли какой-то чистый подход для достижения раннего выхода и завершения конвейера, когда выполняется какое-то условие, без возникновения ошибки?

Кроме того, я нашел этот проект, который выглядит многообещающим ( express обработчики стилей), но у меня возникают трудности с обработкой ошибок, когда задача выдает необработанную ошибку : обещание-конвейер

Обратите внимание, что я НЕ являюсь опытным разработчиком nodejs.

Спасибо!

Комментарии:

1. я бы не советовал promise-pipeline . мне кажется, что они сочетают стиль продолжения с обещаниями. и ему 6 лет, и он загружается 1 раз в неделю, что, вероятно, зависит только от пользователей пакета package-json-to-template .

2. Это кажется проще с асинхронным/ожиданием

Ответ №1:

Вы забываете перейти cancel к своему task

 function run(tasks, input) {
  let context = input;

  /* added cancel closure */
  const cancel = () => {
    throw new PipelineCancelError();
  }

  const reducer = (acc, task) => acc.then((result) => {
    context = resu<
    return task(context); // <- pass cancel to task
  };

  return tasks.reduce(reducer, Promise.resolve(context));
}
 

Давайте посмотрим, как мы могли бы это исправить. Мы добавим простой sleep для демонстрационных целей —

 function run(tasks, input) {
  const cancel = e => { throw e }
  const reducer = (acc, task) => acc.then(result => task(result, cancel))
  return tasks.reduce(reducer, Promise.resolve(input))
}

function sleep (ms) {
  return new Promise(r => setTimeout(r, ms))
}

async function task1 (context, cancel) {
  await sleep(1000)
  console.log(`previous output: ${context}`)
  return context.toUpperCase()
}

async function task2 (context, cancel) {
  await sleep(1000)
  console.log(`previous output: ${context}`)
  return context   "!"
}

async function task3 (context, cancel) {
  await sleep(1000)
  console.log(`previous output: ${context}`)
  cancel(Error("task3 cancelled"))
  return context   "?"
}

run([task1, task2, task2], "hello").then(console.log, console.error) 
 previous output: hello
previous output: HELLO
previous output: HELLO!
HELLO!!
 

И вот демонстрация cancel эффекта —

 function run(tasks, input) {
  const cancel = e => { throw e }
  const reducer = (acc, task) => acc.then(result => task(result, cancel))
  return tasks.reduce(reducer, Promise.resolve(input))
}

function sleep (ms) {
  return new Promise(r => setTimeout(r, ms))
}

async function task1 (context, cancel) {
  await sleep(1000)
  console.log(`previous output: ${context}`)
  return context.toUpperCase()
}

async function task2 (context, cancel) {
  await sleep(1000)
  console.log(`previous output: ${context}`)
  return context   "!"
}

async function task3 (context, cancel) {
  await sleep(1000)
  console.log(`previous output: ${context}`)
  cancel(Error("task3 cancelled"))
  return context   "?"
}

run([task1, task2, task3, task2], "hello").then(console.log, console.error) 
 previous output: hello
previous output: HELLO
previous output: HELLO!
Error: "task3 cancelled"
 

Вы можете ответить на отмену, используя второй аргумент, чтобы .then

 run([task1, task2, task3, task2], "hello")
  .then(console.log, console.error) // <-
 

Или вы можете использовать .catch

 run([task1, task2, task3, task2], "hello")
  .then(console.log)
  .catch(console.error) // <-
 

Комментарии:

1. Ах, да, совсем забыл cancel там. В любом случае спасибо за подробный ответ, но мне было интересно, есть ли способ добиться разрыва цепочки без возникновения ошибки.

Ответ №2:

Этого можно легко достичь, используя отменяемые обещания из моей библиотеки (c-promise2). Смотрите живую демонстрацию

 import { CPromise } from "c-promise2";

function run(tasks, input) {
  return tasks.reduce((acc, task) => acc.then(task), CPromise.resolve(input));
}

const task1 = async (context) => {
  console.log(`[task 1] previous output: ${context}`);
  await CPromise.delay(1000);
  return "data1";
};

const task2 = async (context) => {
  console.log(`[task 2] previous output: ${context}`);
  await CPromise.delay(1000);
  return "data2";
};

// Complex deeply cancellable task to show the benefits
const task3 = function* (context) {
  try {
    console.log(`[task 3] previous output: ${context}`);
    yield CPromise.delay(1000);
    console.log("[task 3] Downloading data...");
    yield CPromise.delay(1000);
    console.log("[task 3] Analyzing...");
    yield CPromise.delay(1000);
    console.log("[task 3] Uploading data...");
    return "data3";
  } finally {
    yield CPromise.delay(1000);
    console.log("[task 3] Deleting temporary data...");
  }
};

const ts = Date.now();

const promise = run([task1, task2, task3], "input-data")
  .then((output) => console.log(`Done: ${output}`))
  .catch((err) => console.error(`Failed after[${Date.now() - ts}ms]: ${err}`));

setTimeout(() => promise.cancel(), 3500); // cancel the pipeline (promise)
 

Комментарии:

1. Я мог бы попробовать. Спасибо!

Ответ №3:

Вы можете использовать API потока узлов. Существуют методы остановки/возобновления/передачи/любого потока.

https://nodejs.org/api/stream.html

Комментарии:

1. К сожалению, это не помогает ответить на мой вопрос, поскольку я вообще не собираюсь использовать потоки на протяжении всего своего конвейера. В любом случае, спасибо.