Как запустить массив запросов с помощью rxj, таких как forkJoin и combineLatest, но без необходимости ждать завершения ВСЕХ запросов, прежде чем увидеть результаты?

#javascript #angular #typescript #rxjs

Вопрос:

Итак, представьте, что у вас есть array список URL-адресов:

 urls: string[]
 

Вы делаете набор запросов (в этом примере я использую HttpClient.get от Angular, который возвращает наблюдаемый).

 const requests = urls.map((url, index) => this.http.get<Film>(url)
 

Теперь я хочу выполнять эти запросы одновременно, но не ждать всех ответов, чтобы увидеть все. Другими словами, если у меня есть что-то подобное films$: Observable<Film[]> , я хочу films$ обновлять постепенно каждый раз, когда приходит ответ.

Теперь, чтобы смоделировать это, вы можете обновить requests вышесказанное до чего-то подобного

 const requests = urls.map((url, index) => this.http.get<Film>(url).pipe(delay((index   1)* 1000))
 

С помощью приведенного выше массива Observable s вы должны получать данные из каждого запроса по одному, так как они не запрашиваются одновременно. Обратите внимание, что это просто подделка разных времен поступления данных из отдельных запросов. Сами запросы должны выполняться одновременно.

Цель состоит в том, чтобы обновлять элементы в films$ каждом значении времени, выдаваемом любым из запросов.

Так что раньше у меня было что-то подобное, когда я неправильно понимал, как combineLatest это работает

 let films$: Observable<Film[]> = of([]);
const requests = urls.map(url => this.http.get<Film>(url)
.pipe(
  take(1),
 // Without this handling, the respective observable does not emit a value and you need ALL of the Observables to emit a value before combineLatest gives you results.
 // rxjs EMPTY short circuits the function as documented. handle the null elements on the template with *ngIf. 
  catchError(()=> of(null))
));
// Expect a value like [{...film1}, null, {...film2}] for when one of the URL's are invalid for example.
films$ = combineLatest(requests); 
 

Я ожидал, что приведенный выше код films$ будет постепенно обновляться, игнорируя часть документации

Чтобы убедиться, что выходной массив всегда имеет одинаковую длину, combineLatest фактически будет ждать, пока все входные наблюдаемые будут выдавать по крайней мере один раз, прежде чем он начнет выдавать результаты.

А это не то, что я искал.

Если есть оператор или функция rxjs, которые могут достичь того, что я ищу, я могу создать более чистый шаблон, просто используя async канал и не обрабатывая null значения и неудачные запросы.

Я также пытался

 this.films$ = from(urls).pipe(mergeMap(url => this.http.get<Film>(url)));
 

и

 this.films$ = from(requests).pipe(mergeAll());
 

что неправильно, потому что возвращаемый тип значения Observable<Film> вместо Observable<Film[]> того, с которым я могу использовать шаблон *ngFor="let film of films$ | async" . Вместо этого, если я подпишусь на него, это будет похоже на то, как если бы я слушал сокет для одной записи, получая обновления в режиме реального времени (поступают отдельные ответы). Я могу вручную подписаться на любой из них и Array.push films: Film[] , например, создать отдельное свойство, но это противоречит цели (используйте Observable шаблон с async трубой).

Ответ №1:

scan Оператор будет хорошо работать для вас здесь:

 
const makeRequest = url => this.http.get<Film>(url).pipe(
  catchError(() => EMPTY))
);

films$: Observable<Film[]> = from(urls).pipe(
  mergeMap(url => makeRequest(url)),
  scan((films, film) => films.concat(film), [])
); 
 

Поток:

  • from выдает URL-адреса по одному за раз
  • mergeMap подписывается на «makeRequest» и отправляет результат в поток
  • scan накапливает результаты в массив и выдает каждый раз, когда поступает новое излучение

Чтобы сохранить порядок, я бы, вероятно, использовал combineLatest , так как он выдает массив в том же порядке, что и входные наблюдаемые. Мы можем начать с каждого наблюдаемого startWith(undefined) , а затем отфильтровать неопределенные элементы:

 
const requests = urls.map(url => this.http.get<Film>(url).pipe(startWith(undefined));

films$: Observable<Film[]> = combineLatest(requests).pipe(
  map(films => films.filter(f => !!f))
); 
 

Комментарии:

1. брат… дай мне это 👊

2. Как бы вы гарантировали, что порядок поступления данных будет соответствовать порядку URL-адресов? У меня есть свой способ сделать это, заменив array.concat часть еще несколькими строками. Просто любопытно, как бы вы сделали это сами.

3. Я бы, наверное, просто использовал combineLatest в этом случае. Я добавил пример.