Управление наблюдаемыми подписками с помощью синхронных потоков

#rxjs

#rxjs

Вопрос:

В чем проблема

Если наблюдаемый объект выполняется синхронно, то обратный вызов, который передается subscribe , выполняется перед subscribe возвратом. В результате следующий код выдает ошибку. (sub не инициализирован)

 const sub = from([1,2,3,4,5]).subscribe(x => {
  if(x > 3) sub.unsubscribe();
  console.log(x);
});
 

Решение Nieve

Если мы принудительно введем значения нашего потока в цикл событий, у нас больше не будет этой проблемы. Subscribe всегда будет возвращаться до вызова лямбда-выражения.

 const sub = from([1,2,3,4,5]).pipe(
  delay(0)
).subscribe(x => {
  if(x > 3) sub.unsubscribe();
  console.log(x);
});
 

Это, однако, кажется мне плохой идеей. Если ни по какой другой причине, кроме производительности. Хотя это также делает порядок выполнения менее детерминированным (какой браузер?, NodeJS?).

Идиоматическое решение RxJS

Не отписывайтесь сами, пусть оператор сделает это за вас

 const unsub = new Subject();
from([1,2,3,4,5]).pipe(
  takeUntil(unsub)
).subscribe(x => {
  if(x > 3) {
    unsub.next();
    unsub.complete();
  }
  console.log(x);
});
 

Проблема здесь в том, что нам нужно создать весь аппарат, который является субъектом, чтобы достичь очень конкретной цели. Это все равно что купить грузовик, чтобы получить колесо. Он плохо масштабируется. Наконец, точно так же, как calling unsubscribe() yourself, он также по-прежнему смешивает императивный и функциональный javascript.

Та же проблема в большем масштабе

Рассмотрим оператор, который принимает список наблюдаемых и выдает значения только из наблюдаемых ранее в списке, чем любые предыдущие выбросы.

Вот этот оператор выполняется с помощью цикла событий.

 function prefer<T, R>(...observables: Observable<R>[]): Observable<R>{
  return new Observable(observer => {

    const subscrptions = new Array<Subscription>();
    const unsub = (index) => {
      for(let i = index; i < subscrptions.length; i  ){
        subscrptions[i].unsubscribe();
      }
    }

    observables.map(stream => stream.pipe(
      delay(0)
    )).forEach((stream, index) => 
      subscrptions.push(stream.subscribe(payload => {
        observer.next(payload);
        unsub(index   1);
        subscrptions.length = index   1;
      }))
    );

    return { unsubscribe: () => unsub(0) }
  })
}
 

а затем без цикла событий и без unsubscribe() .

 function prefer<T, R>(...observables: Observable<R>[]): Observable<R>{
  return defer(() => {

    const wUnsub = observables.map((stream, index) => ({
      stream: stream.pipe(
        map(payload => ({index, payload}))
      ), 
      unsub: new Subject()
    }));

    const unsub = (index) => {
      for(let i = index; i < wUnsub.length; i  ){
        wUnsub[i].unsub.next();
        wUnsub[i].unsub.complete();
      }
    }
    
    return merge(...wUnsub.map(build => build.stream.pipe(
      takeUntil(build.unsub)
    ))).pipe(
      tap(({index}) => {
        unsub(index   1);
        wUnsub.length = index   1;
      }),
      map(({payload}) => payload),
      finalize(() => unsub(0))
    );
  });
}
 

Также здесь используется оператор

 prefer(
  interval(10000).pipe(
    take(5),
    map(_ => "Every 10s")
  ),
  interval(5000).pipe(map(_ => "Every 5s")),
  interval(1000).pipe(map(_ => "Every 1s")),
  interval(250).pipe(map(_ => "Every 1/4s"))
).subscribe(console.log);
 

Представьте себе использование этого оператора в масштабе. Относительно легко понять, что объем памяти при первом подходе намного меньше, чем при втором подходе (O (n) vs O (n * n) использование памяти).


Наконец; Вопрос

Поскольку (в javascript) синхронный код выполняется до завершения перед выполнением любого другого кода, кажется, не имеет смысла иметь доступ к наблюдаемым subscription до того, как вернется синхронный раздел этой подписки. Тем не менее, в качестве средства раннего прерывания потока кажется, что возможность subscription раннего доступа к потоку может иметь преимущества (по крайней мере, в памяти).

Существует ли (относительно) элегантный способ использования Observable для решения этих проблем?

Ответ №1:

Это очень интересный вопрос.

Здесь будет другой подход, который работает только для RxJS 6.x:

 const sub = from([1, 2, 3, 4, 5]).subscribe(function(x) {
  if (x > 3) this.unsubscribe();
  console.log(x);
});
/*
1
2
3
4
*/
 

Это работает subscribe , потому что обратный вызов будет присвоен _next свойству SafeSubscriber , которое будет создано при использовании .subscribe .

 let context: any = this;

...

next = (<((value: T) => void)> observerOrNext);

...


this._context = context;
this._next = next;
...
 

Затем, когда Subscriber next будет вызван метод ‘s, под капотом произойдет что-то вроде next.call(this.context) , где this.context ссылается на текущий SafeSubscriber экземпляр. Когда SafeSubscriber.unsubscribe() вызывается, это в конечном итоге приведет к отказу от подписки всей цепочки подписчиков.

Однако это имеет другую реализацию в RxJS 7, в результате этот подход больше не будет работать.

Комментарии:

1. Это довольно круто. Никогда не рассматривал контекст, заданный для обратных вызовов, поскольку я привык использовать лямбда-нотацию.

2. @MrkSef да, я всегда за анонимные функции, но иногда требуется изменение : D

Ответ №2:

Я понял, что вы хотите отказаться от подписки на наблюдаемый, когда ваше значение больше 3.

 const sub = from([1,2,3,4,5]).subscribe(x => {
  if(x > 3) sub.unsubscribe();
  console.log(x);
});
 

Вам действительно нужна подписка?
Я бы предпочел использовать оператор takeWhile rxjs.

 from([1,2,3,4,5]).pipe(
  takeWhile(x <= 3),
).subscribe(x => console.log(x));
 

Комментарии:

1. take / takeWhile хорошо работает, когда все просто, как в первом примере, но поможет ли это мне реализовать prefer описанный выше оператор? Я не понимаю, как. Как только вы управляете парком потоков, а отмена подписки основана на их объединенных значениях, мне кажется, что take / takeWhile уже недостаточно выразительны.

Ответ №3:

Хорошо, итак, вот я отвечаю на свой собственный вопрос. Проработав над этим слишком долго, я наткнулся на тот факт, что, оказывается, RxJS поставляется с довольно хорошим встроенным решением. Это очень хорошо только потому, что он использует publish/connect то, что, по-видимому, реализовано с субъектами внутри (хотя объем памяти все еще лучше? Не уверен, почему).

На самом деле это не предполагаемое использование publish/connect , поскольку я не использую многоадресную рассылку. Ключ в том, что ConnectableObservables начинаются не с subscribe , а с connect .

Вы можете использовать это, чтобы добиться желаемого поведения, вообще не полагаясь на цикл событий.

Решение с использованием публикации

Мини-пример:

 const stream = publish()(from([1,2,3,4,5]));

const sub = stream.subscribe(x => {
  if(x > 3) sub.unsubscribe();
  console.log(x);
});

stream.connect();
 

Масштабируется до пользовательского оператора:

 function prefer<T, R>(...observables: Observable<R>[]): Observable<R>{
  return new Observable(observer => {

    const subscrptions = new Array<Subscription>();
    const unsub = (index = 0) => {
      for(let i = index; i < subscrptions.length; i  ){
        subscrptions[i].unsubscribe();
      }
    }

    observables
      .map(stream => publish()(stream))
      .map((stream, index) => {
        subscrptions.push(stream.subscribe((payload: R) => {
          observer.next(payload);
          unsub(index   1);
          subscrptions.length = index   1;
        }));
        return stream;
      })
      .forEach(stream => stream.connect());

    return { unsubscribe: () => unsub() }
  })
}
 

Разница:

Используя цикл событий следующим образом:

 const stream = from([1,2,3,4,5]).pipe(
  delay(0)
);

console.log("before subscribe");
const sub = stream.subscribe(x => {
  if(x > 3) sub.unsubscribe();
  console.log(x);
});
console.log("after subscribe");
 

Вывод: "before subscribe" "after subscribe" 1 2 3 4

В соответствии с использованием connect следующим образом:

 const stream = publish()(from([1,2,3,4,5]));

const sub = stream.subscribe(x => {
  if(x > 3) sub.unsubscribe();
  console.log(x);
});

console.log("before connect");
stream.connect();
console.log("after connect");
 

Вывод: "before connect" 1 2 3 4 "after connect"

Поскольку соединение сохраняет синхронные наблюдаемые, unsubscribe оно все равно может выполняться синхронно, и весь поток наблюдаемых обрабатывается до запуска строки after connect . Это довольно большая победа.