Как я должен выдавать одно значение при завершении observable?

#rxjs #observable #rxjs6

#rxjs #наблюдаемый #rxjs6

Вопрос:

Я хочу выдать одно значение при завершении исходного observable, скажем, как показано ниже, используя воображаемый оператор mapComplete :

 let arr = ['a','b', 'c'];

from(arr)
.pipe(mapComplete(()=>'myValue'))
.pipe(map((v)=>`further processed: ${v}`))
.subscribe(console.log)
//further processed: myValue
 

Я попробовал следующие, которые работают, но не кажутся подходящими:

1.

 from(arr)
.pipe(toArray())
.pipe(map(()=>'myValue'))
.pipe(map((v)=>`further processed: ${v}`))
.subscribe(console.log);
//further processed: myValue
 

Проблема: если исходная наблюдаемая представляет собой огромный поток, я не хочу буферизировать его в массив, просто чтобы выдать одно значение.

2.

 from(arr)
.pipe(last())
.pipe(map(()=>'myValue'))
.pipe(map((v)=>`further processed: ${v}`))
.subscribe(console.log);
//further processed: myValue
 

проблема: если поток завершается без каких-либо сообщений, я получаю сообщение об ошибке: [Error [EmptyError]: no elements in sequence]

Каков был бы правильный (в терминах rxjs) способ сделать вышеуказанное?

Ответ №1:

Вы можете добиться этого, ignoreElements чтобы ничего не выдавать и endWith выдавать значение при завершении.

 from(arr).pipe(
  ignoreElements(),
  endWith('myValue'),
  map(v => `further processed: ${v}`)
).subscribe(console.log);
 

Если вы хотите выполнить функцию map , которую вы могли бы использовать count() заранее, чтобы выдать одно значение при завершении (количество выдаваемых значений).

 from(arr).pipe(
  count(), // could also use "reduce(() => null, 0)" or "last(null, 0)" or "takeLast(1), defaultIfEmpty(0)" 
  map(() => getMyValue()),
  map(v => `further processed: ${v}`)
).subscribe(console.log);
 

Комментарии:

1. Хороший ответ. Одно замечание: для полноты картины, поскольку воображаемое mapComplete() принимает лямбда-выражение, было бы неплохо предоставить решение, которое вычисляет значение во время выдачи. Я думаю, что одна дополнительная карта решит это, но, возможно, есть и другое решение..

2. @MarinosAn Да, вы могли бы использовать map after endWith . Единственный другой вариант, который короче, о котором я мог подумать, был бы count раньше map . Есть также несколько альтернатив count , но все они длиннее, как вы можете видеть в моем редактировании.

3. Я думаю, что ответ теперь завершен!

Ответ №2:

Вы можете достичь того, чего хотите, создав свой собственный пользовательский оператор.

Код может выглядеть следующим образом

 const emitWhenComplete = <T>(val: T) => <U>(source: Observable<U>) =>
  new Observable<T>((observer) => {
    return source.subscribe({
      error: (err) => observer.error(err),
      complete: () => {
        observer.next(val);
        observer.complete();
      },
    });
  });
 

По сути, этот оператор будет принимать исходное observable, игнорировать все значения, которые он выдает, и выдавать только при завершении источника.

Вы можете посмотреть на этот stackblitz для некоторых тестов.

Ответ №3:

Вы также можете использовать last() оператор со значением по умолчанию. Это устранит no elements in sequence ошибку, когда поток будет пуст.

 from(arr).pipe(
  last(null, 'myValue'),  // `null` denotes no predicate
  map(_ => 'myValue'),    // map the last value from the stream
  map((v)=>`further processed: ${v}`)
).subscribe(console.log);
 

Комментарии:

1. last(null, 'myValue') не выдает myValue последнее значение, если from(arr) что-то выдает. Поэтому, если from(arr) что-то выдает, вам все равно придется сопоставить его последнее значение с myValue with map(()=>'myValue') .

2. @fridoo: Хорошая мысль, я отредактировал сообщение. Спасибо.