Behaviorsubject и of() ведут себя совершенно по-разному

#angular #rxjs #observable #angular9 #behaviorsubject

#angular #rxjs #наблюдаемый #angular9 #behaviorsubject

Вопрос:

Я пытаюсь воссоздать цепочку обработки наблюдаемых, где первым шагом должен быть subject (мне нужно вызвать .next() ). Использование of () работает, но возвращает Observable, использование BehaviorSubject должно иметь аналогичные эффекты, но это не работает: observable, созданный с помощью of, отлично работает, и подписка передает данные по каналам и возвращает измененные данные, в то время как с behaviorsubject данные остаются в subject, а подписка никогда не получает данные.

Пример:

 getProcessed(processed: string = null, identifier = 'default'): Observable<any> {
  const bs = new BehaviorSubject(this.start.data);
  this.localFilterSub.set(identifier, bs);
  this.localFilterObs.set(identifier, bs.asObservable());
  this.localFilterSet.set(identifier, {});
  this.process(processed, identifier);
  return this.localFilterObs.get(identifier);
}

process(name: string, identifier = 'default') {
  this.localFilterObs.set(identifier, this.doProcess(name, identifier));
}


private doProcess(name: string, identifier = 'default'): Observable<any>|Subject<any> {
  if (name) {
    const inst = new Op();

    const obss = [];
    obss.push(this.localFilterObs.get(identifier));
    obss.push(inst.getExternal());

    return forkJoin(obss).pipe(
      tap((data) => {
        console.log(name, data);
      }),
      map((data) => {
        return inst?.run(data);
        // this.done.push(name);
      }),
      tap((data) => {
        console.log(name, data);
      }),
    );
  } 
}
  

Я действительно не понимаю, что я делаю не так.

Комментарии:

1. Что означает «пока это не работает»? Что не работает? Что вы ожидаете, что произойдет, и что происходит на самом деле? A BehaviorSubject не будет вести себя точно так же, как observable, который вы создали с помощью of .

2. почему? наблюдаемый объект, созданный с помощью of, был бы наблюдаемым объектом, имеющим заданный набор данных, а behaviorsubject был бы субъектом (который является наблюдаемым), который выдает данные, как только на него подписывается. почему должна быть разница?

3. тем временем я отредактировал вопрос.

Ответ №1:

forkJoin не генерирует, пока не завершатся все наблюдаемые. of завершится сразу, но объект поведения не завершится, пока вы не вызовете complete. Используйте combineLatest, и он выдаст, как только будут выданы все наблюдаемые.

 const { BehaviorSubject, of, forkJoin } = rxjs;

const bs$ = new BehaviorSubject('bs');

o$ = of('of')

forkJoin([bs$, o$]).subscribe(res => { console.log(res); });

console.log('Nothing yet as bs$ not complete');

setTimeout(() => { bs$.complete(); }, 2000);  
 <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.6.2/rxjs.umd.min.js"></script>  

но combineLatest сразу же выдаст

 const { BehaviorSubject, of, combineLatest } = rxjs;

const bs$ = new BehaviorSubject('bs');

o$ = of('of')

combineLatest([bs$, o$]).subscribe(res => { console.log(res); });  
 <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.6.2/rxjs.umd.min.js"></script>