RxJS наблюдаемые результаты из цепочки функций

#javascript #rxjs

Вопрос:

У меня есть функция getC , которая зависит от вывода getB , который зависит от вывода getA . Я хочу создать наблюдаемую stream , чтобы при подписке на нее я получал выходные данные каждой функции, как показано во фрагменте.

Есть ли в RxJS оператор, который будет вести себя так же, как в фрагменте? Или это гнездование switchMap , concat и of это единственный способ?

 const { concat, from, of } = rxjs
const { switchMap } = rxjs.operators


const getA = async() => new Promise(resolve => setTimeout(() => resolve({
  a: 'A'
}), 500));

const getB = async value => new Promise(resolve => setTimeout(() => resolve({
  a: value.a,
  b: 'B',
}), 500));

const getC = async value => new Promise(resolve => setTimeout(() => resolve({
  a: value.a,
  b: value.b,
  c: 'C',
}), 500));

const stream = from(getA()).pipe(
  switchMap(a => concat(
    of(a),
    from(getB(a)).pipe(
      switchMap(b => concat(
          of(b),
          getC(b)
        )
      )
    )
  ))
);

stream.subscribe(value => {
  console.log(value);
}); 
 <script src="https://unpkg.com/rxjs@7.3.0/dist/bundles/rxjs.umd.min.js"></script> 

Ответ №1:

Если вы заранее знаете свои зависимости, вы можете жестко закодировать их в разных потоках:

  1. c$ требуется последняя информация о выбросах из b$
  2. b$ требуется последняя информация о выбросах из a$
  3. Запустите потоки в «порядке», соблюдая их зависимости, чтобы в итоге вы получили три выброса, каждый (за исключением первого) в зависимости от предыдущего.
 const a$ = of('A').pipe(delay(600));
const b$ = of('B').pipe(delay(400), combineLatestWith(a$), map(([b, a]) => a   b));
const c$ = of('C').pipe(delay(200), combineLatestWith(b$), map(([c, b]) => b   c));
concat(a$, b$, c$).subscribe(x => console.log(x)); 
 <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/7.3.0/rxjs.umd.min.js" integrity="sha512-y3JTS47nnpKORJX8Jn1Rlm QgRIIZHtu3hWxal0e81avPrqUH48yk aCi gprT0RMAcpYa0WCkapxe bpBHD6g==" crossorigin="anonymous" referrerpolicy="no-referrer"></script>
<script>
const {of, concat} = rxjs;
const {delay, combineLatestWith, map} = rxjs.operators;
</script> 

Комментарии:

1. Да, я думаю combineLatest , что это тот оператор, которого я искал. Таким образом, вам не нужно вставлять объявления. Примечание: В RxJS 7 так и должно быть combineLatestWith .

2. @EdI Вы правы; я пропустил уведомление об устаревании. Я обновил ответ для использования combineLatestWith .

Ответ №2:

Вы можете провести некоторый рефакторинг, чтобы сделать вашу функцию рекурсивной

 const getValue = (propertyName, value = null) => new Promise(resolve => 
  resolve(value ? {...value, [propertyName]: propertyName.toUpperCase()} : {[propertyName]: propertyName.toUpperCase()})
);
 

Затем вы можете использовать волшебный оператор rxjs.

  import { from, EMPTY } from 'rxjs';
 import { expand } from 'rxjs/operators';


from(getValue('a')).pipe(  
 expand((previousData) => {
     if(!previousData.hasOwnProperty('b')) {
       return from(getValue('b', previousData))
     } else if(!previousData.hasOwnProperty('c')) {
       return from(getValue('c', previousData))
     } else {
       return EMPTY
     }
   }
 )
).subscribe(console.log);
 

ВОТ рабочий код https://stackblitz.com/edit/typescript-czpotj

Результат журнала консоли

Комментарии:

1. Верно. expand это другой способ.

2. stackblitz.com/edit/typescript-czpotj?file=index.ts