RXJS — избегать внутренней подписки

#angular #rxjs #rxjs6

#angular #rxjs #rxjs6

Вопрос:

Пытаюсь найти нужный мне рецепт, но нигде не могу его найти.

У меня есть код, который выглядит следующим образом.

 const Listeners = listen("data:join"); /* observable of people who want data */
const DataStream = stream("data"); /* observable of data */
  

Как я могу построить конвейер, который:

  • Для каждого person , который присоединяется к потоку моих слушателей, я подписываю их на поток данных.
  • Каждый пользователь, который запускает data:leave событие, отписывается от потока
  • Длинный список операторов канала DataStream под капотом запускается только один раз, А не по одному разу для каждого человека, который присоединяется.

РЕДАКТИРОВАТЬ: что эквивалентно этому с точки зрения безопасности памяти:

 Listeners.subscribe((personListening) => {
     DataStream.subscribe((data) => personListening.send(data))
     // And until fromEvent(personListening, "data:leave") fires.
})
/* OR */
DataStream.subscribe((data) => {
    Listeners.subscribe((person) => {
        person.send(data);
    })
})
  

Ответ №1:

Я не совсем уверен в вашем поведении observables, но на общем уровне вы могли бы использовать любой из операторов отображения RxJS более высокого порядка (например, switchMap , concatMap и т.д. — различия здесь) Для отображения из одного наблюдаемого в другой. И используйте оператор RxJS takeUntil для завершения / отмены подписки на observable на основе другого observable.

Вы могли бы использовать takeUntil также для закрытия всех открытых подписок при закрытии компонента.

Попробуйте следующее

 import { Subject } from 'rxjs';
import { tap, takeUntil, switchMap } from 'rxjs/operators';

complete$ = new Subject<any>();

Listeners.pipe(
  switchMap((personListening) => {     // <-- switch to the `DataStream` observable
    return DataStream.pipe(
      tap((data) => personListening.send(data)),   // <-- call `send()` here
      takeUntil(fromEvent(personListening, 'data:leave'))
    );
  }),
  takeUntil(this.complete$)      // emit `complete$` on `ngOnDestroy` hook
).subscribe(
  _,                             // <-- do nothing on response
  (err) => console.log(err)      // <-- handle error
);

ngOnDestroy() {
  this.complete$.next();         // <-- close any open subscriptions
}
  

Комментарии:

1. Моя проблема заключалась в том, что это то, что я делал, но каждый прослушиватель каждый раз запускал все внутренние каналы, которые есть у DataStream под капотом, и там были побочные эффекты, которые я не хотел запускать дважды. Теперь я изменил свой внутренний код (под капотом DataStream), и он отлично работает, на самом деле это мое оригинальное решение, которое вы можете увидеть перед моими правками XD, но оно будет помечено как ответ

Ответ №2:

Я думаю, вы хотите посмотреть на операторы skip и take в rxjs.

Пример:

 const data = interval(1000);

const start = timer(4500);
const end = timer(21800);

data.pipe(
  skipUntil(start),
  takeUntil(end),
).subscribe(console.log);
  

data здесь непрерывный поток эмиссий с возрастающим числом каждую секунду. start и end выдавать один раз через определенное время. В консоли вы увидите ограниченный диапазон передаваемых данных.

Stackblitz:https://stackblitz.com/edit/rxjs-ccdfif?file=index.ts

Комментарии:

1. Добавлено редактирование, извините, чтобы прояснить то, что я хочу, нужна безопасная версия этого последнего РЕДАКТИРОВАНИЯ, и я не думаю, что это работает, это сработало бы, если бы у меня был только один человек в слушателях