Хотите буферизировать наблюдаемый объект до тех пор, пока не сработает другой, затем удалите буфер и запустите нормально с одной подпиской

#typescript #rxjs #reactive-programming

#typescript #rxjs #реактивное программирование

Вопрос:

Я внедряю службу аналитики в свое приложение Angular. Я создаю и добавляю сторонний скрипт в конструкторе службы. Может возникнуть состояние гонки, когда другие службы пытаются запустить события телеметрии до загрузки скрипта. Я хотел создать буфер для методов телеметрии для хранения сообщений до загрузки скрипта, а затем очистить буфер, а затем нажать как обычно после этого.

Псевдокод:

 // Global variable set by 3rd party library after it loads
declare let analytics: any;

class AnalyticsService {
  isLoaded$ = new BehaviorSubject<boolean>(false);
  identify$ = new BehaviorSubject<user>(null);

  constructor() {
    this.loadScript();

    // Here, I want to buffer messages until the 3rd party script initializes
    // Once isLoaded$ fires, forEach the queue and pass to the 3rd party library
    // Then for every message after, send one at a time as normal like 
    // the buffer wasn't there
    this.identify$
      .pipe(buffer(this.isLoaded$.pipe(skip(1)) // Essentially want to remove this after it fires
      .subscribe((user) => analytics.identify(user));
  }

  loadScript(): void {
    const script = document.createElement('script');
    script.innerHTML = `
      // setup function from 3rd party
    `;

    document.querySelector('head').appendChild(script);

    interval(1000)
      .pipe(take(5), takeUntil(this.isLoaded$))
      .subscribe(_ => {
        if (analytics) this.isLoaded$.next(true);
      })
  }

  identify(user): void {
    this.identify$.next(user);
  }
}
 

Если бы я использовал две подписки, это было бы похоже

 identify$
  .pipe(
    takeUntil(this.isLoaded$),
    buffer(this.isLoaded$),
  ).subscribe(events => events.forEach(user => analytics.identify(user)));

identify$
  .pipe(
    filter(_ => this.isLoaded$.value),
  ).subscribe(user => analytics.identify(user))
 

Есть ли способ сделать это с одной подпиской?

Ответ №1:

Здесь может быть способ достичь того, что вы ищете:

 constructor () {
  this.loadScript();

  this.identify$
    .pipe(
      buffer(
        concat(
          this.isLoaded$.pipe(skip(1), take(1)),
          this.identify.pipe(skip(1)),
        )
      )
    )
    .subscribe((user) => analytics.identify(user));
}
 

Суть заключается в

 concat(
  this.isLoaded$.pipe(skip(1), take(1)),
  this.identify$.pipe(skip(1)),
)
 

Итак, мы сначала ждем isLoaded$ , чтобы выдать true , затем this.identify.pipe(skip(1)) будем подписаны.

После загрузки скрипта вы хотите немедленно продолжить при this.identify$ отправке. И вот почему мы подписываемся на него еще раз, из buffer уведомителя о закрытии. В принципе, this.identify$ теперь у вас будет 2 подписчика. Первый subscribe((user) => analytics.identify(user)) — это, а второй — это тот, от concat которого (который является buffer уведомителем о закрытии). При this.identify$ отправке значение будет отправлено его подписчикам по порядку. Таким образом, значение в конечном итоге будет добавлено в буфер, а затем немедленно передано следующему подписчику в цепочке, this.identify$ потому что второй подписчик получит значение синхронно.

Комментарии:

1. Не могли бы вы уточнить, каков порядок? В вашем 4-м предложении говорится: «Первым является subscribe () …», а в вашем последнем предложении говорится: «Итак, значение в конечном итоге будет добавлено в буфер, а затем немедленно передано следующему подписчику …». Буферный канал первый правильно?

2. Извините, это действительно немного сбивает с толку. Второй — это тот, который поступает из внутренней подписки buffer. И поскольку это второй, при его отправке буфер уже будет содержать исходное отправленное значение.

Ответ №2:

Вот способ, которому не нужен буфер. Мы задерживаем каждое значение до тех пор, пока this.isLoaded$ не выдаст true . Поскольку это выглядит так.isLoaded$ — это BehaviorSubject , он будет выдан немедленно, как только это значение true, иначе это задержит доставку значения.

 identify$.pipe(
  delayWhen(() => this.isLoaded$.pipe(
    filter(x => x)
  )),
).subscribe(user => analytics.identify(user));
 

Обновление # 1: пользовательский оператор

Вот пользовательский оператор, который задерживает выбросы, в то время как функция возвращает true и выдает буферизованные и текущие значения, в то время как функция возвращает false .

В отличие от предыдущего решения, это сохранит порядок выбросов.

 function delayWhile<T>(fn: (T)=>boolean): MonoTypeOperatorFunction<T>{
  return s => new Observable<T>(observer => {
    const buffer = new Array<T>();
    const sub = s.subscribe({
      next: (v:T) => {
        if(fn(v)){
          buffer.push(v);
        }else{
          if(buffer.length > 0){
            buffer.forEach(observer.next.bind(observer));
            buffer.length = 0;
          }
          observer.next(v);
        }
      },
      complete: observer.complete.bind(observer),
      error: observer.error.bind(observer)
    })
    return {unsubscribe: () => {sub.unsubscribe()}};
  });
}
 

Здесь он используется:

 identify$.pipe(
  delayWhile(() => !this.isLoaded$.value)
).subscribe(user => analytics.identify(user));
 

Комментарии:

1. Просто чтобы уточнить, это сохранит значения, которые генерируются до isLoaded $, и воспроизведет их по порядку? Он ничего не делает, например, принимает только последнее значение?

2. Это должно быть, хотя я предполагаю, что нет встроенной гарантии, что они будут отправлены по порядку, я думаю, они будут основаны на том, как они добавляются в цикл событий JS. Если заказ очень важен, вам может понадобиться другое решение.

3. Обновлено пользовательским оператором, который буферизует отложенные значения внутри и выдает их на основе функции