Угловые рекурсивные HTTP-запросы, объединяющие ответы, в конечном итоге возвращающие единственную наблюдаемую

#angular #rxjs #observable #angular-http

#угловой #rxjs #наблюдаемая #angular-http

Вопрос:

Я хотел бы выполнять рекурсивные HTTP-вызовы для разбитого на страницы API, пока не пройду все страницы. Каждая страница содержит массив ресурсов, которые я объединю во временный массив. Как только я прочитаю все страницы, я хочу вернуть единый массив всех ресурсов в качестве наблюдаемого.

Я использовал канал с оператором расширения. Я замечаю, что могу распечатать свой окончательный массив по завершении подписки, но я не знаю, как элегантно вернуть этот массив в качестве наблюдаемого. Есть ли какой-нибудь способ дождаться завершения одной наблюдаемой, а затем вернуть другую?

 getAllAssets(sort: string, filter: AssetFilter): Observable<Asset[]> {
  let allAssets: Asset[] = [];
  console.log('getAllAssets()', sort, filter);
  this._getAllAssets(null, null).subscribe((moreAssets) => {
    allAssets = allAssets.concat(moreAssets);
  }, null, () => console.log(allAssets));
  return of(allAssets);
}

_getAllAssets(sort: string, filter: AssetFilter) {
  let currentPage = 0;
  return this.getAssets(null, null, sort, filter).pipe(
    expand(assetsPage => {
      if (assetsPage amp;amp; assetsPage.page.number   1 < assetsPage.page.totalPages) {
        return this.getAssets(currentPage  = 1, null, sort, filter);
      }
      return empty();
    }),
    map((value) => value._embedded.assets)
  );
}

getAssets(page: number, size: number, sort: string, filter: AssetFilter): Observable<Assets> {
  let url = 'https://gateway.'   environment.region   '.mindsphere.io/api/assetmanagement/v3/assets?';
  if (page) {
    url  = 'page='   page;
  }
  if (size) {
    url  = 'size='   size;
  }
  if (sort) {
    url  = 'sort='   sort;
  }
  if (filter) {
    url  = 'filter='   JSON.stringify(filter);
  }
  return this.http.get<Assets>(url, {
    headers: new HttpHeaders().set('Content-Type', 'application/json').set('Authorization', 'Bearer '   this.accessToken)
  });
}
  

Я получаю правильный массив, напечатанный на экране, но я хотел бы вернуть этот массив как наблюдаемый без создания наблюдаемого, возврата наблюдаемого и отправки ему по завершении.

РЕДАКТИРОВАТЬ: Мое исправление заключалось в использовании toArray() , как указано ниже. Затем я просто сделал карту, чтобы превратить Observable<item[][]> в Observable<item[]>

 getAllAssets(sort: string, filter: AssetFilter): Observable<Asset[]> {
  let currentPage = 0;
  return this.getAssets(null, null, sort, filter).pipe(
    expand(assetsPage => {
      if (assetsPage amp;amp; assetsPage.page.number   1 < assetsPage.page.totalPages) {
        return this.getAssets((currentPage  = 1), null, sort, filter);
      }
      return empty();
    }),
    map(value => value._embedded.assets),
    toArray()
  ).pipe(map(assets => [].concat.apply([], assets)));
}

getAssets(page: number, size: number, sort: string, filter: AssetFilter): Observable<Assets> {
  let url = 'https://gateway.'   environment.region   '.mindsphere.io/api/assetmanagement/v3/assets?';
  if (page) {
    url  = 'page='   page;
  }
  if (size) {
    url  = 'size='   size;
  }
  if (sort) {
    url  = 'sort='   sort;
  }
  if (filter) {
    url  = 'filter='   JSON.stringify(filter);
  }
  return this.http.get<Assets>(url, {
    headers: new HttpHeaders().set('Content-Type', 'application/json').set('Authorization', 'Bearer '   this.accessToken)
  });
}
  

Ответ №1:

Это немного кода для просеивания, поэтому я собираюсь сократить и обобщить, поскольку это общий шаблон, это примет форму:

    _getAllPages(page, constantArgs, lastSet) {
     lastSet = lastSet || []; // deal with entry
     return this.getPage(page, constantArgs).pipe( // fetch the page
        switchMap(res => 
          (res.nextPage) // if nextpage, recurse and concat to last set
             ? this._getAllPages(res.nextPage, lastSet.concat(res.items))
             : of(lastSet.concat(res.items)))); // else break
   }
  

для целей стиля я обычно делаю это приватным, и у меня будет публичный, подобный:

 getAllPages(constantArgs) {
  return this._getAllPages(0, constantArgs);
}
  

чтобы сделать API лучше / проще для понимания.

Ответ №2:

Вам нужен toArray оператор

Вот пример с timer

 timer(0, 5).pipe(
    take(3),
    toArray()
  )
  

toArray

Комментарии:

1. Это отлично работает, но что, если бы я хотел объединить все те массивы, которые возвращаются. Это заставило бы мою функцию возвращать, Observable<Asset[][]> но я хочу Observable<Asset[]> только с одним массивом всех элементов.

2. @AustenStone тогда вы могли бы forkJoin использовать все наблюдаемые, которые вам нужны

3. toArray заключается в объединении всех значений в потоке в массив. forkJoin заключается в объединении последних значений в потоках. Объединяйте их по мере необходимости. Одно важное замечание : попробуйте иметь только одну подписку и поместите ее в конец цепочки вызовов. Если это служебный код, которым вы поделились — удалите из него подписку, просто пройдите через наблюдаемую цепочку.