#angular #rxjs #rxjs6
#angular #rxjs #rxjs6
Вопрос:
Пытаюсь найти нужный мне рецепт, но нигде не могу его найти.
У меня есть код, который выглядит следующим образом.
const Listeners = listen("data:join"); /* observable of people who want data */
const DataStream = stream("data"); /* observable of data */
Как я могу построить конвейер, который:
- Для каждого
person
, который присоединяется к потоку моих слушателей, я подписываю их на поток данных. - Каждый пользователь, который запускает
data:leave
событие, отписывается от потока - Длинный список операторов канала DataStream под капотом запускается только один раз, А не по одному разу для каждого человека, который присоединяется.
РЕДАКТИРОВАТЬ: что эквивалентно этому с точки зрения безопасности памяти:
Listeners.subscribe((personListening) => {
DataStream.subscribe((data) => personListening.send(data))
// And until fromEvent(personListening, "data:leave") fires.
})
/* OR */
DataStream.subscribe((data) => {
Listeners.subscribe((person) => {
person.send(data);
})
})
Ответ №1:
Я не совсем уверен в вашем поведении observables, но на общем уровне вы могли бы использовать любой из операторов отображения RxJS более высокого порядка (например, switchMap
, concatMap
и т.д. — различия здесь) Для отображения из одного наблюдаемого в другой. И используйте оператор RxJS takeUntil
для завершения / отмены подписки на observable на основе другого observable.
Вы могли бы использовать takeUntil
также для закрытия всех открытых подписок при закрытии компонента.
Попробуйте следующее
import { Subject } from 'rxjs';
import { tap, takeUntil, switchMap } from 'rxjs/operators';
complete$ = new Subject<any>();
Listeners.pipe(
switchMap((personListening) => { // <-- switch to the `DataStream` observable
return DataStream.pipe(
tap((data) => personListening.send(data)), // <-- call `send()` here
takeUntil(fromEvent(personListening, 'data:leave'))
);
}),
takeUntil(this.complete$) // emit `complete$` on `ngOnDestroy` hook
).subscribe(
_, // <-- do nothing on response
(err) => console.log(err) // <-- handle error
);
ngOnDestroy() {
this.complete$.next(); // <-- close any open subscriptions
}
Комментарии:
1. Моя проблема заключалась в том, что это то, что я делал, но каждый прослушиватель каждый раз запускал все внутренние каналы, которые есть у DataStream под капотом, и там были побочные эффекты, которые я не хотел запускать дважды. Теперь я изменил свой внутренний код (под капотом DataStream), и он отлично работает, на самом деле это мое оригинальное решение, которое вы можете увидеть перед моими правками XD, но оно будет помечено как ответ
Ответ №2:
Я думаю, вы хотите посмотреть на операторы skip и take в rxjs.
Пример:
const data = interval(1000);
const start = timer(4500);
const end = timer(21800);
data.pipe(
skipUntil(start),
takeUntil(end),
).subscribe(console.log);
data
здесь непрерывный поток эмиссий с возрастающим числом каждую секунду. start
и end
выдавать один раз через определенное время. В консоли вы увидите ограниченный диапазон передаваемых данных.
Stackblitz:https://stackblitz.com/edit/rxjs-ccdfif?file=index.ts
Комментарии:
1. Добавлено редактирование, извините, чтобы прояснить то, что я хочу, нужна безопасная версия этого последнего РЕДАКТИРОВАНИЯ, и я не думаю, что это работает, это сработало бы, если бы у меня был только один человек в слушателях