rxjs — оператор с обратными вызовами как при следующем вызове, так и при ошибке?

#javascript #node.js #rxjs

#javascript #node.js #rxjs

Вопрос:

Я ищу эквивалент promise.then(onNextCallback,onErrorCallback) в rxjs.

есть ли что-то подобное этому?

  • pipe(concatMap(),catchError) альтернативы — это не то, что я ищу.

Комментарии:

1. Почему вы не ищете concatMap и catchError ? Вы всегда можете вложить их таким образом, чтобы обратный вызов с ошибкой не обрабатывал ошибки из следующего обратного вызова. Но да, похоже, что для этой довольно примитивной операции нет встроенной функции.

2. можете ли вы привести пример для: You can always nest them in a way that the error callback doesn't handle errors from the next callback ? если вы можете это сделать, предложите записать это как в ответе.

3. Если вы недостаточно хорошо знаете rxjs, чтобы предоставить рабочий код, но я имею в виду что-то вроде pipe(map(value => ({ok:true, value})), catchError(err => ({ok: false, value: err})), concatMap(({ok, value}) => ok ? handleSucess(value) : handleError(value)))

4. Связанная тема в Твиттере: twitter.com/OliverJAsh/status/1291792172413530114

5. @Stav Не беспокойся, но для меня это была не случайная тема. 🙂 Это был вопрос, который Оливер задал мне напрямую.

Ответ №1:

Я также просил об этом и в итоге написал свой собственный.

 import * as RxJS from 'rxjs';

/**
 * Like `promise.then(onFulfilled, onRejected)`. This is *not* the same as
 * `map(onNext).catchError(onError)`, because this operator will only catch errors resulting from
 * the original observable—not errors resulting from `onNext`.
 */
export const mapOrCatchError = <T, B>(
  onNext: (value: T) => B,
  onError: (value: unknown) => B,
): RxJS.OperatorFunction<T, B> => ob$ =>
  new RxJS.Observable<B>(observer =>
    ob$.subscribe({
      next: t => {
        let next: B;
        try {
          next = onNext(t);
        } catch (error) {
          observer.error(error);
          return;
        }
        observer.next(next);
      },
      error: error => {
        let next: B;
        try {
          next = onError(error);
        } catch (newError) {
          observer.error(newError);
          return;
        }
        observer.next(next);
        observer.complete();
      },
      complete: () => {
        observer.complete();
      },
    }),
  );
  

Тесты:

 import { marbles } from 'rxjs-marbles/jest';
import { mapOrCatchError } from '../operators';

describe('mapOrCatchError', () => {
  it(
    'should map',
    marbles(m => {
      const source$ = m.cold('--(a|)', { a: 1 });
      const expected = '      --(b|)';

      const actual$ = source$.pipe(
        mapOrCatchError(
          a => a   1,
          _error => 0,
        ),
      );
      m.expect(actual$).toBeObservable(expected, { b: 2 });
    }),
  );
  it(
    'should catch',
    marbles(m => {
      const source$ = m.cold('--#');
      const expected = '      --(a|)';

      const actual$ = source$.pipe(
        mapOrCatchError(
          a => a   1,
          _error => 0,
        ),
      );
      m.expect(actual$).toBeObservable(expected, { a: 0 });
    }),
  );
  it(
    'should error if error handler throws',
    marbles(m => {
      const source$ = m.cold('--#');
      const expected = '      --#';

      const error = new Error('foo');
      const actual$ = source$.pipe(
        mapOrCatchError(
          a => a   1,
          _error => {
            throw error;
          },
        ),
      );
      m.expect(actual$).toBeObservable(expected, undefined, error);
    }),
  );
  it(
    'should not catch errors thrown by map function',
    marbles(m => {
      const source$ = m.cold('--(a|)');
      const expected = '      --#';

      const error = new Error('foo');
      const actual$ = source$.pipe(
        mapOrCatchError(
          () => {
            throw error;
          },
          _error => 'caught error',
        ),
      );
      m.expect(actual$).toBeObservable(expected, undefined, error);
    }),
  );
});
  

Комментарии:

1. Это могло бы быть немного более продвинутым, например, функция catch должна иметь возможность возвращать Observable точно так же, catchError как это делает. Для этого реализация становится немного сложнее, поэтому я не стал беспокоиться. Но если кто-то хочет сделать еще один шаг, будьте моим гостем: D

2. но я думал, что оба обратных вызова поддерживают обещания / наблюдаемые, ха-ха. я пропустил это: P я попытаюсь взять это отсюда

3. может быть, вы можете добавить в заголовок, что оба обратных вызова поддерживают только вычисления синхронизации.

Ответ №2:

Действительно, я не знаю ни одного оператора, который делал бы то, что вам нужно.

Однако я предлагаю создать свой собственный.

 function promiseLike<T>(
  onNext: (data: T) => Observable<T>,
  onError: (err: any) => Observable<any>
) {

  type ErrorWrapper = {isError: boolean, err: any};
  const isErrorWrapper = (err: any): err is ErrorWrapper => {
    return err.isError amp;amp; err.err !== undefined;
  }

  return function(source: Observable<T>): Observable<T> {
    return source.pipe(
      catchError((err) => of({isError: true, err})),
      switchMap((data) => isErrorWrapper(data) ? onError(data.err) : onNext(data))
    );
  }
}
  

Вышесказанное в основном переносит ошибку из наблюдаемого источника, а затем мы switchMap решаем, запускать ли мне onNext и onError . Таким образом, onError наверняка не будет перехватывать возможные ошибки, исходящие от onNext .

Вот пример использования:

 function getResolvingPromise(): Promise<string> {
  return Promise.resolve('SUCCESS');
}

function getErrorPromise(): Promise<string> {
  return Promise.reject('onError');
}

// Example with success
from(getResolvingPromise()).pipe(
  promiseLike(
    (data) => of(`received ${data}`),
    (err) => of(3)
  )
).subscribe((d) => console.log('Ex1', d)) // Logs Ex1 received SUCCESS

// Example with error
from(getErrorPromise()).pipe(
  promiseLike(
    (data) => of(`received ${data}`),
    (err) => of(3)
  )
).subscribe((d) => console.log('Ex2', d)) // Logs Ex2 3

// Example with internal error 2
from(getResolvingPromise()).pipe(
  promiseLike(
    (data) => throwError('Valid token not returned'),
    (err) => of(3)
  ),
  catchError(() => of('catching internal error'))
).subscribe((d) => console.log('Ex3', d)) // Logs Ex3 catching internal error
  

Ответ №3:

Как насчет чего-то подобного?

source$ доступен ли исходный код, и вы можете предоставить соответствующие функции с помощью оператора tap внутри pipe .

 source$.pipe(
  tap({ next: onNextCallback, error: onErrorCallback })
)
  

Комментарии:

1. Мне нужно вернуть наблюдаемые значения из обратных вызовов next и error . вот почему я ищу promise.then эквивалент 😉

2. @StavAlfi Ах, я вижу

Ответ №4:

.toPromise() Это то, что вы ищете?

https://www.learnrxjs.io/learn-rxjs/operators/utility/topromise

Затем вы можете выполнить свой .then(nextCallback) .catch(errorCallback)

Комментарии:

1. это был мой обходной путь, но это плохой обходной путь, потому что он работает, только если наблюдаемое конечное, и я согласен с ожиданием, пока наблюдаемое не будет выполнено.