RxJS обнаруживает ошибку и продолжает

#angular #rxjs #observable

Вопрос:

У меня есть список элементов для анализа, но анализ одного из них может завершиться неудачей.

Каков «Rx-способ» поймать ошибку, но продолжить выполнение последовательности

Пример кода:

 var observable = Rx.Observable.from([0,1,2,3,4,5])
.map(
  function(value){
      if(value == 3){
        throw new Error("Value cannot be 3");
      }
    return value;
  });

observable.subscribe(
  function(value){
  console.log("onNext "   value);
  },
  function(error){
    console.log("Error: "   error.message);
  },
  function(){
    console.log("Completed!");
  }); 
 <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/4.0.6/rx.all.js"></script> 

Что я хочу сделать не по-Rx-способу:

 var items = [0,1,2,3,4,5];

for (var item in items){
  try{
    if(item == 3){
      throw new Error("Value cannot be 3");
    }
    console.log(item);
  }catch(error){
     console.log("Error: "   error.message);
  }
} 

Заранее спасибо

Комментарии:

1. См. Rx.Наблюдаемый. onErrorResumeNext

2. Видел это, но не могу заставить это работать таким образом… Не могли бы вы, пожалуйста, написать пример?

3. Смотрите, как Бен Лешс рассказывает об этом специально в 23:43.

Ответ №1:

Я бы посоветовал вам вместо этого использовать flatMap (теперь mergeMap в версии 5 rxjs), что позволит вам свернуть ошибки, если они вас не волнуют. По сути, вы создадите внутреннюю наблюдаемую, которую можно будет проглотить в случае возникновения ошибки. Преимущество этого подхода заключается в том, что вы можете объединить операторов в цепочку, и если где-либо в конвейере произойдет ошибка, она автоматически будет перенаправлена в блок catch.

 const {from, iif, throwError, of, EMPTY} = rxjs;
const {map, flatMap, catchError} = rxjs.operators;

// A helper method to let us create arbitrary operators
const {pipe} = rxjs;

// Create an operator that will catch and squash errors
// This returns a function of the shape of Observable<T> => Observable<R>
const mapAndContinueOnError = pipe(
  //This will get skipped if upstream throws an error
  map(v => v * 2),
  catchError(err => {
    console.log("Caught Error, continuing")
    //Return an empty Observable which gets collapsed in the output
    return EMPTY;
  })
)

const observable = from([0, 1, 2, 3, 4, 5]).pipe(
  flatMap((value) => 
    iif(() => value != 3, 
      of(value), 
      throwError(new Error("Value cannot be 3"))
    ).pipe(mapAndContinueOnError)
  )
);

observable.subscribe(
  (value) => console.log("onNext "   value), (error) => console.log("Error: "   error.message), () => console.log("Completed!")
); 
 <script src="https://unpkg.com/rxjs@7.0.0/dist/bundles/rxjs.umd.min.js"></script> 

Комментарии:

1. что делать, если ошибка произойдет где-то в цепочке после flatMap ?

2. @nonybright, чтобы затем он уничтожил поток и выдал ошибку подписчику, если только где — то ниже по потоку не было другого подвоха.

3. Это, похоже, не работает в RxJS v6. Я попытался перенести код, и наблюдаемое завершается после обнаружения ошибки. Не могли бы вы обновить свое решение? Я создал здесь штакблица для справки. Воспроизводимый код Стекблитца.

4. @ManishShrestha Вот ты где!

5. Мне нравится этот ответ за — голую pipe функцию, рабочий пример RxJS и const деструкцию импорта. Каждый день-это школьный день! — ох и за iif что !

Ответ №2:

Вам нужно переключиться на новый одноразовый поток, и если в нем возникнет ошибка, он будет безопасно удален и сохранит исходный поток живым:

 Rx.Observable.from([0,1,2,3,4,5])
    .switchMap(value => {

        // This is the disposable stream!
        // Errors can safely occur in here without killing the original stream

        return Rx.Observable.of(value)
            .map(value => {
                if (value === 3) {
                    throw new Error('Value cannot be 3');
                }
                return value;
            })
            .catch(error => {
                // You can do some fancy stuff here with errors if you like
                // Below we are just returning the error object to the outer stream
                return Rx.Observable.of(error);
            });

    })
    .map(value => {
        if (value instanceof Error) {
            // Maybe do some error handling here
            return `Error: ${value.message}`;
        }
        return value;
    })
    .subscribe(
      (x => console.log('Success', x)),
      (x => console.log('Error', x)),
      (() => console.log('Complete'))
    ); 
 <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.4.1/Rx.min.js"></script> 

Более подробная информация об этой технике в этом посте в блоге: Поиски фрикаделек: Продолжайте потоки RxJS при возникновении ошибок

Комментарии:

1. Как бы это выглядело с операторами lettable и pipe ?

2. БУДЬТЕ ОСТОРОЖНЫ, если внутреннему наблюдаемому требуется некоторое время для излучения, он может никогда не излучаться при использовании switchMap() , поскольку он отменяет предыдущее наблюдаемое. Чтобы проверить это самостоятельно, замените Rx.Observable.of(value) этот ответ на Rx.Observable.from(new Promise((resolve, reject) => setTimeout(() => resolve(i)))) . Результатом будет только: «Успех 5, завершен». Замена switchMap() на mergeMap() исправляет проблему и обычно дает ожидаемые результаты.

Ответ №3:

Чтобы вы endlessObservable$ не умерли, вы можете поместить свой failingObservable$ в оператор отображения более высокого порядка (например switchMap , concatMap , exhaustMap …) и проглотить ошибку, завершив внутренний поток с empty() наблюдаемым возвратом без значений.

Использование RxJS 6:

 endlessObservable$
    .pipe(
        switchMap(() => failingObservable$
            .pipe(
                catchError((err) => {
                    console.error(err);
                    return EMPTY;
                })
            )
        )
    );
 

Комментарии:

1. Стараюсь, чтобы это было как можно проще для людей, ищущих рабочее решение.

2. более наглядный пример этой «двойной упаковки» на learnrxjs.io/operators/error_handling/catch.html (пример№. 3) или stackblitz.com/edit/rxjs-catcherror-withmapoperators

3. Это не отменит незавершенный failingObservable$, когда будет выпущен endlessObservable$?

4. @isevcik да, это будет — это просто пример; вы можете использовать любой из других операторов отображения более высокого порядка в зависимости от необходимого вам поведения.

5. emtpy() теперь устарел, EMPTY его следует использовать вместо этого.

Ответ №4:

Вы действительно можете использовать функцию try/catch внутри своей функции карты для обработки ошибки. Вот фрагмент кода

 var source = Rx.Observable.from([0, 1, 2, 3, 4, 5])
    .map(
        function(value) {
            try {
                if (value === 3) {
                    throw new Error("Value cannot be 3");
                }
                return value;

            } catch (error) {
                console.log('I caught an error');
                return undefined;
            }
        })
    .filter(function(x) {
        return x !== undefined; });


source.subscribe(
    function(value) {
        console.log("onNext "   value);
    },
    function(error) {
        console.log("Error: "   error.message);
    },
    function() {
        console.log("Completed!");
    }); 
 <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/4.0.6/rx.all.js"></script>