Как объединить потоки в rxjs?

#javascript #rxjs

#javascript #rxjs

Вопрос:

Я пытаюсь получить данные с сервера, используя rxjs:

 class HolidaysService {
    constructor() {
        this.restService = newRestService();
    }

    public getHolidaysByYear(year: number): Observable<Date[]> {
        const endpoint = `/get${year}`;
        return this.restService.GET(endpoint, { cache: true })
            .pipe(map(dates: any)=> dates.map((item: string) => new Date(item)))
    }

    public getHolidaysByYears(years: number[]): Observable<Date[]> {
        const requests = years.reduce((acc,v)=>acc.concat(this.getHolidaysByYear(v)),[])
        return forkJoin(...requests).pipe(scan((acc,v)=>acc.concat(v), []))
    }

}

// We call it:
const holidayService = new HolidaysService();
const substriction1 = holidayService.getHolidaysByYear(2020)
    .subscribe(
        res => console.log('res', res),
        err => console.warn(err)
    );
substriction1.unsunscribe();
  

Консоль отображает res и сами данные. Теперь я хочу получить информацию за несколько лет, используя rxjs-аналог promise.all:

 const substriction2 = holidayService.getHolidaysByYears([2019,2020])
    .subscribe(
        res => console.log('res', res),
        err => console.warn(err)
    );
substriction2.unsunscribe();
  

И ничего не работает. Я не вижу res в консоли и данных. В чем может быть ошибка?

Ответ №1:

Вот как вы бы написали Promise.all аналог в RxJS с forkJoin :

 public getHolidaysByYears(years: number[]): Observable<Date[]> {
    const requests = years.map(year => this.getHolidaysByYear(year));
    return forkJoin(requests).pipe(concatAll());
}
  

Обратите внимание, что я использую concatAll operator , потому forkJoin(requests) что это даст вам Observable<Date[][]> массив праздничных дат в год. Поскольку ваш возвращаемый тип равен Observable<Date[]> , именно поэтому я использую concatAll operator для объединения результирующих массивов и получения Observable<Date[]> вместо Observable<Date[][]> .