Интервал RxJS, запрос AJAX и уменьшение

#angular #rxjs

#angular #rxjs

Вопрос:

Я пытаюсь выяснить, как отправлять AJAX-запросы каждые пять секунд при отображении данных и их сокращении с помощью операторов RxJS.

Итак, есть простой сервис Angular, который запрашивает внешний API для астронавтов, которые в данный момент находятся в космосе: http://api.open-notify.org/astros.json

В службе я пытаюсь сначала установить interval(5000) , а затем использовать оператор сопоставления RxJS для сопоставления входящего номера с запросом GET.

Задача такова: мне нужно сначала получить астронавтов, затем найти тех, кто летит на МКС, и затем этих астронавтов я могу отобразить в виде. Данные должны обновляться каждые 5 секунд; поэтому мне нужно повторно отправлять один и тот же HTTP-запрос каждые 5 секунд, что я могу сделать с setInterval() . И это не самое чистое решение проблемы.

Вот код:

 import { Injectable } from '@angular/core';
import { HttpClient, HttpErrorResponse } from '@angular/common/http';
import { Observable, throwError, interval } from 'rxjs';
import { catchError, filter, concatMap, flatMap, map, reduce } from 'rxjs/operators';
import { Astronaut } from '../models/astronaut.model';

const astronautsUri = 'http://api.open-notify.org/astros.json';

@Injectable({
  providedIn: 'root'
})
export class IssLocatorService {
  constructor(private http: HttpClient) {}

  getAstronauts(): Observable<Astronaut[]> {
    return interval(5000).pipe(
      concatMap(num => this.http.get(astronautsUri)),
      flatMap((newAstronauts: any) => newAstronauts.people),
      filter((astronaut: Astronaut) => astronaut.craft === 'ISS'),
      map((astronaut: Astronaut) => [astronaut]),
      reduce((prev, next) => [...prev, ...next]),
      catchError(this.handleError)
    );
  }
}
  

Увы, код не работает.
Хотя поток действительно поступает к reduce() оператору, оператор не возвращается к subscribe() методу в компоненте.

Мне кажется, однако, что решение должно работать нормально. Вот как, я думаю, это работает:

  1. Через 5 секунд выдается первое число 0, и внешняя наблюдаемая величина сопоставляется с внутренней наблюдаемой величиной — AJAX-запросом. concatMap() ожидает, пока внутренняя наблюдаемая не завершится, и только тогда получает второе число — 1.
  2. Внутренним наблюдаемым является HTTP-запрос, который возвращает JSON. Мне нужны только люди из объекта, который выглядит похожим на этот: {success:true, people: [...]} ), поэтому затем я использую flatMap() для преобразования этого объекта в массив астронавтов people . Каждый объект в people этом случае становится наблюдаемым из-за того, как flatMap() работает.
  3. Я фильтрую каждый объект astronaut.
  4. Я сопоставляю каждый объект astronaut с массивом, подлежащим уменьшению.
  5. Я уменьшаю массив астронавтов. Другими словами, astronauts.people воспроизводится благодаря reduce() .
  6. (Вот проблема) reduce() , согласно спецификации, должен вернуться к subscribe , потому что внутренняя наблюдаемая завершена. Но этого не происходит: reduce() ожидает, пока следующее число не будет выдано interval() и сопоставлено с внутренним наблюдаемым, снова people возвращаются и помещаются в тот же массив. И это продолжается и продолжается.

Если я заменю reduce() на scan , массив или астронавты действительно будут возвращены subscribe методу. Однако этот массив постоянно увеличивается из-за того, что в него неоднократно помещаются объекты astronaut.

Следующий подход работает просто отлично:

 return this.http.get(astronautsUri).pipe(
  flatMap((newAstronauts: any) => newAstronauts.people),
  filter((astronaut: Astronaut) => astronaut.craft === 'ISS'),
  map((astronaut: Astronaut) => [astronaut]),
  reduce((prev, next) => [...prev, ...next]),
  catchError(this.handleError)
);
  

Но в этом случае мне приходится вручную устанавливать интервал с помощью setInterval() в классе компонента, который отображает астронавтов, и я должен вызвать getAstronauts() метод. Итак, в ngOnInit , по сути, есть два вызова метода.

Как я могу достичь желаемого эффекта только с помощью операторов RxJS? Я хочу настроить интервал, сопоставить и отфильтровать и уменьшить массив объектов, а затем получить их.

Мое понимание того, как работает отображение RxJS, действительно плохое, но я попробовал (ради попытки) все эти методы — switchMap() , concatMap() , exhaustMap() , flatMap() interval() — для отображения чисел из ,, в запрос AJAX. Это все еще не работает.

Ответ №1:

Я думаю, чего вы пытаетесь достичь, это

 getAstronauts(): Observable<Astronaut[]> {
    return interval(5000).pipe(
      concatMap(num => 
        this.http.get(astronautsUri).pipe(
          flatMap((newAstronauts: any) => newAstronauts.people),
          filter((astronaut: Astronaut) => astronaut.craft === 'ISS'),
          reduce((prev, astronaut) => [...prev, astronaut], []),
        )
      ),
      catchError(this.handleError)
    );
  }
  

проблема метода reduce заключается в том, что он ожидает завершения работы своего исходного observable, прежде чем выдавать какое-либо значение. в моем исходном коде — это элементы одного запроса, а в рассматриваемом примере источником являются все элементы из всех запросов, которые никогда не заканчиваются

Комментарии:

1. Спасибо за ответ. Это то, что многим людям может не понравиться — вложенные каналы. Хотя, я полагаю, другого подхода нет. Все работает нормально, так что еще раз спасибо.

2. этого можно добиться с помощью windowTime оператора, поскольку вы знаете, что все эти элементы массива будут поступать через интервал 5000. Вы получите 1 уровень вложенности, но это может быть немного затруднено, если выполнение какого-либо запроса занимает слишком много времени

Ответ №2:

Измените reduce на scan, scan выполняет то же самое, что и reduce, но генерирует на каждой итерации, reduce генерирует только после завершения всего потока.

 const { from } = rxjs;
const { scan } = rxjs.operators;

const obs$ = from([1,2,3,4,5]);

obs$.pipe(scan((total, item) => total   item)).subscribe(val => { console.log(val); });  
 <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.4.0/rxjs.umd.min.js"></script>  

 const { from } = rxjs;
const { reduce } = rxjs.operators;

const obs$ = from([1,2,3,4,5]);

obs$.pipe(reduce((total, item) => total   item)).subscribe(val => { console.log(val); });  
 <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.4.0/rxjs.umd.min.js"></script>  

Комментарии:

1. Это не работает. scan() возвращает промежуточное значение, но массив все время увеличивается, потому что в него помещаются одни и те же объекты astronaut.