#typescript #rxjs #reactive-programming
#typescript #rxjs #реактивное программирование
Вопрос:
Я внедряю службу аналитики в свое приложение Angular. Я создаю и добавляю сторонний скрипт в конструкторе службы. Может возникнуть состояние гонки, когда другие службы пытаются запустить события телеметрии до загрузки скрипта. Я хотел создать буфер для методов телеметрии для хранения сообщений до загрузки скрипта, а затем очистить буфер, а затем нажать как обычно после этого.
Псевдокод:
// Global variable set by 3rd party library after it loads
declare let analytics: any;
class AnalyticsService {
isLoaded$ = new BehaviorSubject<boolean>(false);
identify$ = new BehaviorSubject<user>(null);
constructor() {
this.loadScript();
// Here, I want to buffer messages until the 3rd party script initializes
// Once isLoaded$ fires, forEach the queue and pass to the 3rd party library
// Then for every message after, send one at a time as normal like
// the buffer wasn't there
this.identify$
.pipe(buffer(this.isLoaded$.pipe(skip(1)) // Essentially want to remove this after it fires
.subscribe((user) => analytics.identify(user));
}
loadScript(): void {
const script = document.createElement('script');
script.innerHTML = `
// setup function from 3rd party
`;
document.querySelector('head').appendChild(script);
interval(1000)
.pipe(take(5), takeUntil(this.isLoaded$))
.subscribe(_ => {
if (analytics) this.isLoaded$.next(true);
})
}
identify(user): void {
this.identify$.next(user);
}
}
Если бы я использовал две подписки, это было бы похоже
identify$
.pipe(
takeUntil(this.isLoaded$),
buffer(this.isLoaded$),
).subscribe(events => events.forEach(user => analytics.identify(user)));
identify$
.pipe(
filter(_ => this.isLoaded$.value),
).subscribe(user => analytics.identify(user))
Есть ли способ сделать это с одной подпиской?
Ответ №1:
Здесь может быть способ достичь того, что вы ищете:
constructor () {
this.loadScript();
this.identify$
.pipe(
buffer(
concat(
this.isLoaded$.pipe(skip(1), take(1)),
this.identify.pipe(skip(1)),
)
)
)
.subscribe((user) => analytics.identify(user));
}
Суть заключается в
concat(
this.isLoaded$.pipe(skip(1), take(1)),
this.identify$.pipe(skip(1)),
)
Итак, мы сначала ждем isLoaded$
, чтобы выдать true
, затем this.identify.pipe(skip(1))
будем подписаны.
После загрузки скрипта вы хотите немедленно продолжить при this.identify$
отправке. И вот почему мы подписываемся на него еще раз, из buffer
уведомителя о закрытии. В принципе, this.identify$
теперь у вас будет 2 подписчика. Первый subscribe((user) => analytics.identify(user))
— это, а второй — это тот, от concat
которого (который является buffer
уведомителем о закрытии). При this.identify$
отправке значение будет отправлено его подписчикам по порядку. Таким образом, значение в конечном итоге будет добавлено в буфер, а затем немедленно передано следующему подписчику в цепочке, this.identify$
потому что второй подписчик получит значение синхронно.
Комментарии:
1. Не могли бы вы уточнить, каков порядок? В вашем 4-м предложении говорится: «Первым является subscribe () …», а в вашем последнем предложении говорится: «Итак, значение в конечном итоге будет добавлено в буфер, а затем немедленно передано следующему подписчику …». Буферный канал первый правильно?
2. Извините, это действительно немного сбивает с толку. Второй — это тот, который поступает из внутренней подписки buffer. И поскольку это второй, при его отправке буфер уже будет содержать исходное отправленное значение.
Ответ №2:
Вот способ, которому не нужен буфер. Мы задерживаем каждое значение до тех пор, пока this.isLoaded$ не выдаст true . Поскольку это выглядит так.isLoaded$ — это BehaviorSubject , он будет выдан немедленно, как только это значение true, иначе это задержит доставку значения.
identify$.pipe(
delayWhen(() => this.isLoaded$.pipe(
filter(x => x)
)),
).subscribe(user => analytics.identify(user));
Обновление # 1: пользовательский оператор
Вот пользовательский оператор, который задерживает выбросы, в то время как функция возвращает true и выдает буферизованные и текущие значения, в то время как функция возвращает false .
В отличие от предыдущего решения, это сохранит порядок выбросов.
function delayWhile<T>(fn: (T)=>boolean): MonoTypeOperatorFunction<T>{
return s => new Observable<T>(observer => {
const buffer = new Array<T>();
const sub = s.subscribe({
next: (v:T) => {
if(fn(v)){
buffer.push(v);
}else{
if(buffer.length > 0){
buffer.forEach(observer.next.bind(observer));
buffer.length = 0;
}
observer.next(v);
}
},
complete: observer.complete.bind(observer),
error: observer.error.bind(observer)
})
return {unsubscribe: () => {sub.unsubscribe()}};
});
}
Здесь он используется:
identify$.pipe(
delayWhile(() => !this.isLoaded$.value)
).subscribe(user => analytics.identify(user));
Комментарии:
1. Просто чтобы уточнить, это сохранит значения, которые генерируются до isLoaded $, и воспроизведет их по порядку? Он ничего не делает, например, принимает только последнее значение?
2. Это должно быть, хотя я предполагаю, что нет встроенной гарантии, что они будут отправлены по порядку, я думаю, они будут основаны на том, как они добавляются в цикл событий JS. Если заказ очень важен, вам может понадобиться другое решение.
3. Обновлено пользовательским оператором, который буферизует отложенные значения внутри и выдает их на основе функции