#angular #rxjs #observable
#angular #rxjs #наблюдаемый
Вопрос:
Я пытаюсь пинговать URL / API непрерывно с помощью rxjs.
Моя первая попытка была:
timer(3000, 2000)
.pipe(mergeMap(() => this._http.get(environment.pingUrl)))
.subscribe(res => console.log(res), err => console.log(err))
Но когда URL недоступен, я никогда не получаю ответа. Поэтому я решил установить тайм-аут:
timer(3000, 2000)
.pipe(mergeMap(() => this._http.get(environment.pingUrl).pipe(timeout(5000)))
.subscribe(res => console.log(res), err => console.log(err))
Теперь я получаю первый тайм-аут / ответ, когда api недоступен, но затем кажется, что тайм-аут вызывает отмену подписки для таймера, и дальнейшего ответа нет. Я понятия не имею, как предотвратить эту отмену подписки.
Ответ №1:
Об ошибках
В RxJS наблюдаемые объекты имеют три типа выбросов.
next
: Значение в steam, может быть от 0 до произвольно многих из нихerror
: Эмиссия терминала. Этот экземпляр наблюдаемого имеет ошибку и закрыт. Он никогда не сможет выдать снова.complete
: Эмиссия терминала. Этот экземпляр наблюдаемого выполнен и закрыт. Он больше никогда не будет выдавать.
Вы заметите, что экземпляр наблюдаемого объекта никогда не может выдавать более одного error
или complete
эмиссии, но любой наблюдаемый объект может запускаться (или перезапускаться / повторяться) любое количество раз. Каждый раз, когда вы подписываетесь или какой-либо оператор подписывается за вас, вы создаете новый экземпляр наблюдаемого.
Создание потока, который выдает ошибки, но все равно продолжается после этого
Поскольку наблюдаемые объекты никогда не могут излучать после ошибки, единственный способ генерировать ошибки при продолжении — это перехватить error
излучение и превратить его в next
излучение.
Пример:
В этом примере каждый раз, когда источник обнаруживает наблюдаемые ошибки, я сообщаю catchError
о повторной подписке на источник.
- Я испускаю регулярные выбросы внутри объекта со
{value: }
свойством - Я выдаю ошибки внутри объекта со
{error: }
свойством.
Я справляюсь с этими обнаруженными ошибками, просто выводя их на консоль, но вы, вероятно, захотите сделать что-то более приятное, чем это 🙂
timer(3000, 2000).pipe(
mergeMap(_ => this._http.get(environment.pingUrl)),
map(value => ({value})),
timeout({each: 5000}),
catchError((error, src) => {
// Only catch/handle TimeoutError
if(error instanceof TimeoutError){
return src.pipe(startWith({error}));
} else {
return throwError(() => error);
}
})
).subscribe({
next: emitted => {
if("error" in emitted){
// Found an error! In this case, we only handled
// TimeoutErrors, so that'll appear here as a next
// emission. The observable is still running.
console.log("Caught an error: ", emitted.error);
}else{
// got a result!
console.log(emitted.value);
}
},
// All other errors appear here and are error emissions,
// so they're terminal. This observable is closed
error: err => console.log("Uncaught error: ", err),
// The complete emission is also terminal. This
// observable is now closed.
complete: () => console.log("Complete")
});
Ответ №2:
Насколько я понимаю, вы хотели бы непрерывно пинговать API и продолжать, когда этот API недоступен.
Проблема, с которой вы сталкиваетесь, заключается в том, что при сбое API наблюдаемое завершается и больше не запускается.
Вам нужно будет использовать повторную попытку или повторную попытку, когда
и просто сделайте
timer(2000, 3000)
.pipe(
mergeMap(() => this._http.get(environment.pingUrl)),
timeout(5000),
retryWhen(() => timer(3000))
)
.subscribe(
(res) => console.log(res),
(err) => console.log(err)
);
пример: https://stackblitz.com/edit/rxjs-playground-test-mgppcj
Обязательно отпишитесь от этого наблюдения в какой-то момент.