#node.js #promise #pipeline
Вопрос:
Недавно я играл с композицией асинхронных функций в nodejs и понял, что было бы неплохо, если бы я мог обернуть их в конвейер, чтобы использовать их в другом месте. Каждый вывод предыдущей асинхронной функции будет вводиться для следующей. Для базовой реализации я использовал такой код:
pipeline.js
function run(tasks, input) {
let context = input;
const reducer = (acc, task) => acc.then((result) => {
context = resu<
return task(context);
};
return tasks.reduce(reducer, Promise.resolve(context));
}
tasks.js
/* do nothing, return input */
const task1 = async (context) => {
console.log(`previous output: ${context}`);
return context;
}
/* do nothing, return input */
const task2 = async (context) => {
console.log(`previous output: ${context}`);
return context;
}
...
test.js
/* import tasks -> tas1, task2, ... */
const tasks = require('tasks.js');
const pipeline = require('pipeline.js');
pipeline.run(tasks, null)
.then((output) => console.log('done')
.catch((err) => console.error(err));
где :
pipeline.js
: реализация трубопроводаtasks.js
: определения функций, из которых я могу составить конвейерtest.js
: код для проверки всего
Это основной сценарий, который будет соответствовать моим потребностям большую часть времени. Тем не менее, я пришел к выводу, что мне, скорее всего, потребуется ранний выход (то есть раннее завершение конвейера), когда будет выполнено определенное условие. Я не мог найти способ чисто достичь этого, и единственное, что сработало для меня, — это отклонить обещание с некоторой ошибкой, которая будет действовать как автоматический выключатель. Это подход, который я использовал:
pipeline.js
function run(tasks, input) {
let context = input;
/* added cancel closure */
const cancel = () => {
throw new PipelineCancelError();
}
const reducer = (acc, task) => acc.then((result) => {
context = resu<
return task(context);
};
return tasks.reduce(reducer, Promise.resolve(context));
}
таким образом, мои задачи получат дополнительную функцию в качестве аргумента, чтобы я мог отменить какой-то этап конвейера :
tasks.js
/* do nothing, return input */
const task3 = async (context, cancel) => {
console.log(`previous output: ${context}`);
if (someCondition) {
/* cancel pipeline */
cancel();
}
return context;
}
Мой вопрос: существует ли какой-то чистый подход для достижения раннего выхода и завершения конвейера, когда выполняется какое-то условие, без возникновения ошибки?
Кроме того, я нашел этот проект, который выглядит многообещающим ( express
обработчики стилей), но у меня возникают трудности с обработкой ошибок, когда задача выдает необработанную ошибку : обещание-конвейер
Обратите внимание, что я НЕ являюсь опытным разработчиком nodejs.
Спасибо!
Комментарии:
1. я бы не советовал
promise-pipeline
. мне кажется, что они сочетают стиль продолжения с обещаниями. и ему 6 лет, и он загружается 1 раз в неделю, что, вероятно, зависит только от пользователей пакетаpackage-json-to-template
.2. Это кажется проще с асинхронным/ожиданием
Ответ №1:
Вы забываете перейти cancel
к своему task
—
function run(tasks, input) {
let context = input;
/* added cancel closure */
const cancel = () => {
throw new PipelineCancelError();
}
const reducer = (acc, task) => acc.then((result) => {
context = resu<
return task(context); // <- pass cancel to task
};
return tasks.reduce(reducer, Promise.resolve(context));
}
Давайте посмотрим, как мы могли бы это исправить. Мы добавим простой sleep
для демонстрационных целей —
function run(tasks, input) {
const cancel = e => { throw e }
const reducer = (acc, task) => acc.then(result => task(result, cancel))
return tasks.reduce(reducer, Promise.resolve(input))
}
function sleep (ms) {
return new Promise(r => setTimeout(r, ms))
}
async function task1 (context, cancel) {
await sleep(1000)
console.log(`previous output: ${context}`)
return context.toUpperCase()
}
async function task2 (context, cancel) {
await sleep(1000)
console.log(`previous output: ${context}`)
return context "!"
}
async function task3 (context, cancel) {
await sleep(1000)
console.log(`previous output: ${context}`)
cancel(Error("task3 cancelled"))
return context "?"
}
run([task1, task2, task2], "hello").then(console.log, console.error)
previous output: hello
previous output: HELLO
previous output: HELLO!
HELLO!!
И вот демонстрация cancel
эффекта —
function run(tasks, input) {
const cancel = e => { throw e }
const reducer = (acc, task) => acc.then(result => task(result, cancel))
return tasks.reduce(reducer, Promise.resolve(input))
}
function sleep (ms) {
return new Promise(r => setTimeout(r, ms))
}
async function task1 (context, cancel) {
await sleep(1000)
console.log(`previous output: ${context}`)
return context.toUpperCase()
}
async function task2 (context, cancel) {
await sleep(1000)
console.log(`previous output: ${context}`)
return context "!"
}
async function task3 (context, cancel) {
await sleep(1000)
console.log(`previous output: ${context}`)
cancel(Error("task3 cancelled"))
return context "?"
}
run([task1, task2, task3, task2], "hello").then(console.log, console.error)
previous output: hello
previous output: HELLO
previous output: HELLO!
Error: "task3 cancelled"
Вы можете ответить на отмену, используя второй аргумент, чтобы .then
—
run([task1, task2, task3, task2], "hello")
.then(console.log, console.error) // <-
Или вы можете использовать .catch
—
run([task1, task2, task3, task2], "hello")
.then(console.log)
.catch(console.error) // <-
Комментарии:
1. Ах, да, совсем забыл
cancel
там. В любом случае спасибо за подробный ответ, но мне было интересно, есть ли способ добиться разрыва цепочки без возникновения ошибки.
Ответ №2:
Этого можно легко достичь, используя отменяемые обещания из моей библиотеки (c-promise2). Смотрите живую демонстрацию
import { CPromise } from "c-promise2";
function run(tasks, input) {
return tasks.reduce((acc, task) => acc.then(task), CPromise.resolve(input));
}
const task1 = async (context) => {
console.log(`[task 1] previous output: ${context}`);
await CPromise.delay(1000);
return "data1";
};
const task2 = async (context) => {
console.log(`[task 2] previous output: ${context}`);
await CPromise.delay(1000);
return "data2";
};
// Complex deeply cancellable task to show the benefits
const task3 = function* (context) {
try {
console.log(`[task 3] previous output: ${context}`);
yield CPromise.delay(1000);
console.log("[task 3] Downloading data...");
yield CPromise.delay(1000);
console.log("[task 3] Analyzing...");
yield CPromise.delay(1000);
console.log("[task 3] Uploading data...");
return "data3";
} finally {
yield CPromise.delay(1000);
console.log("[task 3] Deleting temporary data...");
}
};
const ts = Date.now();
const promise = run([task1, task2, task3], "input-data")
.then((output) => console.log(`Done: ${output}`))
.catch((err) => console.error(`Failed after[${Date.now() - ts}ms]: ${err}`));
setTimeout(() => promise.cancel(), 3500); // cancel the pipeline (promise)
Комментарии:
1. Я мог бы попробовать. Спасибо!
Ответ №3:
Вы можете использовать API потока узлов. Существуют методы остановки/возобновления/передачи/любого потока.
Комментарии:
1. К сожалению, это не помогает ответить на мой вопрос, поскольку я вообще не собираюсь использовать потоки на протяжении всего своего конвейера. В любом случае, спасибо.