Наблюдаемые объекты Angular 2 rxjs, созданные из BehaviorSubject, не работают с forkJoin

#angular #rxjs #behaviorsubject

#angular #rxjs #behaviorsubject

Вопрос:

Я пытаюсь использовать Observable.forkJoin , и обработчик подписки никогда не попадает. Оператор forkJoin работает у меня в других частях моего приложения, и единственное отличие, о котором я могу подумать в нерабочем сценарии, заключается в том, что наблюдаемые объекты создаются из BehaviorSubject объектов с использованием его asObservable() функции.

Эта подписка попадает

     let obs = Observable.of(1);

    Observable.forkJoin(
        obs
    ).subscribe(data => {
        console.log(data);
    });
  

Этот не работает

     let bs = new BehaviorSubject<number>(1);
    let obs = bs.asObservable();

    Observable.forkJoin(
        obs
    ).subscribe(data => {
        console.log(data);
    });
  

Конечно, в моем реальном варианте использования существует более одного наблюдения, поэтому я в первую очередь использую forkJoin.

Есть ли что-то еще, что нужно сделать с BehaviorSubject, чтобы заставить его работать с forkJoin?

Обновить:

После более детального изучения документов RxJS я понял, что Observable.combineLatest они намного лучше подходят для моих нужд, чем forkJoin … Ссылка здесь на случай, если кто-нибудь наткнется на этот пост SO: http://reactivex.io/rxjs/class/es6/Observable.js ~ Observable.html#static-method-combineLatest

Ответ №1:

Проблема в том, что forkJoin наблюдаемые объекты соединяются при их завершении.

В вашем первом фрагменте вы создаете наблюдаемый объект с помощью of , который после subscribe немедленно выдает значение и затем завершается.

В вашем втором фрагменте BehaviorSubject не завершается. Если бы вы вызвали complete , вы бы увидели значение, записанное в консоль:

 let bs = new BehaviorSubject<number>(1);
let obs = bs.asObservable();

Observable.forkJoin(
    obs
).subscribe(data => {
    console.log(data);
});

bs.complete();
  

Комментарии:

1. Спасибо за ответ. После более детального изучения документов RxJS я понял, что Observable.combineLatest они намного лучше подходят для моих нужд, чем forkJoin … Ссылка здесь на случай, если кто-нибудь наткнется на этот пост SO: reactivex.io/rxjs/class/es6 /…

Ответ №2:

Вы можете использовать канал take (1), если вы не хотите использовать метод complete.

 const subjectA = new BehaviorSubject<number>(0);
const a$ = subjectA.asObservable();

Observable.forkJoin(
    a$.pipe(take(1))
).subscribe(data => {
    console.log(data);
});