Обрабатывать данные в канале асинхронным способом

#rxjs

#rxjs

Вопрос:

У меня есть process функция, которая работает асинхронно — она берет строку входного текста и обрабатывает ее (во фрагменте ниже — для упрощения — она извлекает число из строки). Строки поступают из входного большого файла (много ГБ) и считываются и отправляются в канал один за другим (это «моделируется» of оператором ниже). process Функция работает медленно, но чтение строк выполняется быстро, поэтому критической точкой является чтение и обработка строк по очереди, чтобы избежать «переполнения стека» памяти.

 const { of, Subject } = rxjs;
const { take, bufferCount, map, finalize } = rxjs.operators;


let source = of("line 11 of text", "line 22 of text", "line 33 of text") // large file with lines
  .pipe(map(line => process(line, ()=> { 
    // ???? how te callback/pipe shoud look ????
  }))
);

source.subscribe(x=> console.log(x)); 
// expected result shoud be: 
// 11
// 22
// 33
  

// This is third-party library - I cannot change it
function process(line, callback) {
  setTimeout(_=>{ 
    let result = line.match(/d/)[0];
    callback(result);
  }, (1 Math.random()*9)*100); // random processing time
}  
 <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.5.5/rxjs.umd.min.js" integrity="sha256-85uCh8dPb35sH3WK435rtYUKALbcvEQFC65eg raeuc=" crossorigin="anonymous"></script>  

Я не знаю, как это сделать — есть идеи?

Комментарии:

1. Поскольку вы не можете изменить сторонний метод, я сомневаюсь, что вы сможете справиться с этим с помощью rxjs. Основная проблема заключается в том, что api третьей части не возвращает наблюдаемый объект, который не соответствует базовым требованиям rxjs. Также каким-то образом мы можем управлять способом rxjs, используя операторы задержки / отмены, но опять же, потому что, как работает цикл событий JS, предоставленное решение не будет идеальным. Вы должны найти способ вернуть observable из стороннего API (если сможете)

2. @user2216584 Марчин дает решение — посмотрите на него

3. Спасибо! Для меня это новый урок.

Ответ №1:

Вам необходимо преобразовать обратный вызов в Promise, поскольку Promises могут работать с rxjs потоками.

 import { of } from "rxjs";
import { concatMap } from "rxjs/operators";

const promiseWrapper = line => new Promise(resolve => process(line, resolve));

let source = of("line 11 of text", "line 22 of text", "line 33 of text").pipe(
  concatMap(item => promiseWrapper(item))
);

source.subscribe(x => console.log(x));
// expected result shoud be:
// 11
// 22
// 33

// This is third-party library - I cannot change it
function process(line, callback) {
  setTimeout(_ => {
    let result = line.match(/d /)[0];
    callback(result);
  }, 1000);
}
  

кроме того, я подготовил stackblitz (или фрагмент ниже)

 const { of, Subject } = rxjs;
const { concatMap } = rxjs.operators;

const promiseWrapper = line => new Promise(resolve => process(line, resolve));

let source = of(
  "line 11 of text", 
  "line 22 of text", 
  "line 33 of text") // large file with lines
  .pipe(concatMap(item => promiseWrapper(item))
);

source.subscribe(x=> console.log(x)); 
// expected result shoud be: 
// 11
// 22
// 33
  

// This is third-party library - I cannot change it
function process(line, callback) {
  setTimeout(_=>{ 
    let result = line.match(/d /)[0];
    callback(result);
  }, (1 Math.random()*9)*100);
}  
 <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.5.5/rxjs.umd.min.js" integrity="sha256-85uCh8dPb35sH3WK435rtYUKALbcvEQFC65eg raeuc=" crossorigin="anonymous"></script>  

Ответ №2:

Вы можете использовать bindCallback для преобразования функции типа f(x, callback) в функцию g(x) , которая возвращает холодную наблюдаемую, которая выдает результат, переданный обратному вызову.

 import { bindCallback, of } from "rxjs";
import { concatMap } from "rxjs/operators";

const boundProcess = bindCallback(process);
let source = of("line 11 of text", "line 22 of text", "line 33 of text").pipe(
  concatMap(line => boundProcess(line))
);

source.subscribe(x => console.log(x));
// expected result shoud be:
// 11
// 22
// 33

// This is third-party library - I cannot change it
function process(line, callback) {
  setTimeout(_ => {
    let result = line.match(/d /)[0];
    callback(result);
  }, 1000);
}
  

https://stackblitz.com/edit/typescript-2rai4c?file=index.ts