#rxjs
#rxjs
Вопрос:
У меня есть process
функция, которая работает асинхронно — она берет строку входного текста и обрабатывает ее (во фрагменте ниже — для упрощения — она извлекает число из строки). Строки поступают из входного большого файла (много ГБ) и считываются и отправляются в канал один за другим (это «моделируется» of
оператором ниже). process
Функция работает медленно, но чтение строк выполняется быстро, поэтому критической точкой является чтение и обработка строк по очереди, чтобы избежать «переполнения стека» памяти.
const { of, Subject } = rxjs;
const { take, bufferCount, map, finalize } = rxjs.operators;
let source = of("line 11 of text", "line 22 of text", "line 33 of text") // large file with lines
.pipe(map(line => process(line, ()=> {
// ???? how te callback/pipe shoud look ????
}))
);
source.subscribe(x=> console.log(x));
// expected result shoud be:
// 11
// 22
// 33
// This is third-party library - I cannot change it
function process(line, callback) {
setTimeout(_=>{
let result = line.match(/d/)[0];
callback(result);
}, (1 Math.random()*9)*100); // random processing time
}
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.5.5/rxjs.umd.min.js" integrity="sha256-85uCh8dPb35sH3WK435rtYUKALbcvEQFC65eg raeuc=" crossorigin="anonymous"></script>
Я не знаю, как это сделать — есть идеи?
Комментарии:
1. Поскольку вы не можете изменить сторонний метод, я сомневаюсь, что вы сможете справиться с этим с помощью rxjs. Основная проблема заключается в том, что api третьей части не возвращает наблюдаемый объект, который не соответствует базовым требованиям rxjs. Также каким-то образом мы можем управлять способом rxjs, используя операторы задержки / отмены, но опять же, потому что, как работает цикл событий JS, предоставленное решение не будет идеальным. Вы должны найти способ вернуть observable из стороннего API (если сможете)
2. @user2216584 Марчин дает решение — посмотрите на него
3. Спасибо! Для меня это новый урок.
Ответ №1:
Вам необходимо преобразовать обратный вызов в Promise, поскольку Promises могут работать с rxjs
потоками.
import { of } from "rxjs";
import { concatMap } from "rxjs/operators";
const promiseWrapper = line => new Promise(resolve => process(line, resolve));
let source = of("line 11 of text", "line 22 of text", "line 33 of text").pipe(
concatMap(item => promiseWrapper(item))
);
source.subscribe(x => console.log(x));
// expected result shoud be:
// 11
// 22
// 33
// This is third-party library - I cannot change it
function process(line, callback) {
setTimeout(_ => {
let result = line.match(/d /)[0];
callback(result);
}, 1000);
}
кроме того, я подготовил stackblitz (или фрагмент ниже)
const { of, Subject } = rxjs;
const { concatMap } = rxjs.operators;
const promiseWrapper = line => new Promise(resolve => process(line, resolve));
let source = of(
"line 11 of text",
"line 22 of text",
"line 33 of text") // large file with lines
.pipe(concatMap(item => promiseWrapper(item))
);
source.subscribe(x=> console.log(x));
// expected result shoud be:
// 11
// 22
// 33
// This is third-party library - I cannot change it
function process(line, callback) {
setTimeout(_=>{
let result = line.match(/d /)[0];
callback(result);
}, (1 Math.random()*9)*100);
}
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.5.5/rxjs.umd.min.js" integrity="sha256-85uCh8dPb35sH3WK435rtYUKALbcvEQFC65eg raeuc=" crossorigin="anonymous"></script>
Ответ №2:
Вы можете использовать bindCallback
для преобразования функции типа f(x, callback)
в функцию g(x)
, которая возвращает холодную наблюдаемую, которая выдает результат, переданный обратному вызову.
import { bindCallback, of } from "rxjs";
import { concatMap } from "rxjs/operators";
const boundProcess = bindCallback(process);
let source = of("line 11 of text", "line 22 of text", "line 33 of text").pipe(
concatMap(line => boundProcess(line))
);
source.subscribe(x => console.log(x));
// expected result shoud be:
// 11
// 22
// 33
// This is third-party library - I cannot change it
function process(line, callback) {
setTimeout(_ => {
let result = line.match(/d /)[0];
callback(result);
}, 1000);
}