#angular #rxjs
#angular #rxjs
Вопрос:
Я пытаюсь выяснить, как отправлять AJAX-запросы каждые пять секунд при отображении данных и их сокращении с помощью операторов RxJS.
Итак, есть простой сервис Angular, который запрашивает внешний API для астронавтов, которые в данный момент находятся в космосе: http://api.open-notify.org/astros.json
В службе я пытаюсь сначала установить interval(5000)
, а затем использовать оператор сопоставления RxJS для сопоставления входящего номера с запросом GET.
Задача такова: мне нужно сначала получить астронавтов, затем найти тех, кто летит на МКС, и затем этих астронавтов я могу отобразить в виде. Данные должны обновляться каждые 5 секунд; поэтому мне нужно повторно отправлять один и тот же HTTP-запрос каждые 5 секунд, что я могу сделать с setInterval()
. И это не самое чистое решение проблемы.
Вот код:
import { Injectable } from '@angular/core';
import { HttpClient, HttpErrorResponse } from '@angular/common/http';
import { Observable, throwError, interval } from 'rxjs';
import { catchError, filter, concatMap, flatMap, map, reduce } from 'rxjs/operators';
import { Astronaut } from '../models/astronaut.model';
const astronautsUri = 'http://api.open-notify.org/astros.json';
@Injectable({
providedIn: 'root'
})
export class IssLocatorService {
constructor(private http: HttpClient) {}
getAstronauts(): Observable<Astronaut[]> {
return interval(5000).pipe(
concatMap(num => this.http.get(astronautsUri)),
flatMap((newAstronauts: any) => newAstronauts.people),
filter((astronaut: Astronaut) => astronaut.craft === 'ISS'),
map((astronaut: Astronaut) => [astronaut]),
reduce((prev, next) => [...prev, ...next]),
catchError(this.handleError)
);
}
}
Увы, код не работает.
Хотя поток действительно поступает к reduce()
оператору, оператор не возвращается к subscribe()
методу в компоненте.
Мне кажется, однако, что решение должно работать нормально. Вот как, я думаю, это работает:
- Через 5 секунд выдается первое число 0, и внешняя наблюдаемая величина сопоставляется с внутренней наблюдаемой величиной — AJAX-запросом.
concatMap()
ожидает, пока внутренняя наблюдаемая не завершится, и только тогда получает второе число — 1. - Внутренним наблюдаемым является HTTP-запрос, который возвращает JSON. Мне нужны только люди из объекта, который выглядит похожим на этот:
{success:true, people: [...]}
), поэтому затем я используюflatMap()
для преобразования этого объекта в массив астронавтовpeople
. Каждый объект вpeople
этом случае становится наблюдаемым из-за того, какflatMap()
работает. - Я фильтрую каждый объект astronaut.
- Я сопоставляю каждый объект astronaut с массивом, подлежащим уменьшению.
- Я уменьшаю массив астронавтов. Другими словами,
astronauts.people
воспроизводится благодаряreduce()
. - (Вот проблема)
reduce()
, согласно спецификации, должен вернуться кsubscribe
, потому что внутренняя наблюдаемая завершена. Но этого не происходит:reduce()
ожидает, пока следующее число не будет выданоinterval()
и сопоставлено с внутренним наблюдаемым, сноваpeople
возвращаются и помещаются в тот же массив. И это продолжается и продолжается.
Если я заменю reduce()
на scan
, массив или астронавты действительно будут возвращены subscribe
методу. Однако этот массив постоянно увеличивается из-за того, что в него неоднократно помещаются объекты astronaut.
Следующий подход работает просто отлично:
return this.http.get(astronautsUri).pipe(
flatMap((newAstronauts: any) => newAstronauts.people),
filter((astronaut: Astronaut) => astronaut.craft === 'ISS'),
map((astronaut: Astronaut) => [astronaut]),
reduce((prev, next) => [...prev, ...next]),
catchError(this.handleError)
);
Но в этом случае мне приходится вручную устанавливать интервал с помощью setInterval()
в классе компонента, который отображает астронавтов, и я должен вызвать getAstronauts()
метод. Итак, в ngOnInit
, по сути, есть два вызова метода.
Как я могу достичь желаемого эффекта только с помощью операторов RxJS? Я хочу настроить интервал, сопоставить и отфильтровать и уменьшить массив объектов, а затем получить их.
Мое понимание того, как работает отображение RxJS, действительно плохое, но я попробовал (ради попытки) все эти методы — switchMap()
, concatMap()
, exhaustMap()
, flatMap()
interval()
— для отображения чисел из ,, в запрос AJAX. Это все еще не работает.
Ответ №1:
Я думаю, чего вы пытаетесь достичь, это
getAstronauts(): Observable<Astronaut[]> {
return interval(5000).pipe(
concatMap(num =>
this.http.get(astronautsUri).pipe(
flatMap((newAstronauts: any) => newAstronauts.people),
filter((astronaut: Astronaut) => astronaut.craft === 'ISS'),
reduce((prev, astronaut) => [...prev, astronaut], []),
)
),
catchError(this.handleError)
);
}
проблема метода reduce заключается в том, что он ожидает завершения работы своего исходного observable, прежде чем выдавать какое-либо значение. в моем исходном коде — это элементы одного запроса, а в рассматриваемом примере источником являются все элементы из всех запросов, которые никогда не заканчиваются
Комментарии:
1. Спасибо за ответ. Это то, что многим людям может не понравиться — вложенные каналы. Хотя, я полагаю, другого подхода нет. Все работает нормально, так что еще раз спасибо.
2. этого можно добиться с помощью
windowTime
оператора, поскольку вы знаете, что все эти элементы массива будут поступать через интервал 5000. Вы получите 1 уровень вложенности, но это может быть немного затруднено, если выполнение какого-либо запроса занимает слишком много времени
Ответ №2:
Измените reduce на scan, scan выполняет то же самое, что и reduce, но генерирует на каждой итерации, reduce генерирует только после завершения всего потока.
const { from } = rxjs;
const { scan } = rxjs.operators;
const obs$ = from([1,2,3,4,5]);
obs$.pipe(scan((total, item) => total item)).subscribe(val => { console.log(val); });
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.4.0/rxjs.umd.min.js"></script>
const { from } = rxjs;
const { reduce } = rxjs.operators;
const obs$ = from([1,2,3,4,5]);
obs$.pipe(reduce((total, item) => total item)).subscribe(val => { console.log(val); });
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.4.0/rxjs.umd.min.js"></script>
Комментарии:
1. Это не работает.
scan()
возвращает промежуточное значение, но массив все время увеличивается, потому что в него помещаются одни и те же объекты astronaut.