#javascript #rxjs #behaviorsubject
#javascript #rxjs #behaviorsubject
Вопрос:
Я написал следующий код, ожидая, что последняя подписка напечатает [‘отправленное значение 1’, ‘отправленное значение 2’], но вместо этого печатает только [‘отправленное значение 2’] . Это имеет смысл, потому что BehaviorSubject выдает последнее значение, но почему накопитель сканирования сбрасывается до начального значения?
Я обнаружил, что это поведение легко исправить с помощью shareReplay (1), но я не совсем понимаю, почему. Что происходит с аккумулятором сканирования в обоих случаях, при использовании shareReplay (1) или при его неиспользовании?
const subject = new BehaviorSubject(null);
const action$ = subject.asObservable();
const obs$ = action$.pipe(scan((acc, curr) => [...acc, curr], []));
// const obs$ = action$.pipe(scan((acc, curr) => [...acc, curr], []), shareReplay(1));
obs$.subscribe(res => console.log('first subs', res));
subject.next('emitted value 1');
subject.next('emitter value 2');
obs$.subscribe(res => console.log('second subs', res));
Комментарии:
1. итак, у вас есть какие-либо вопросы по поводу ответа ниже?
2. Нет, теперь я это понимаю. Слово cache помогло мне это ясно увидеть.
Ответ №1:
Когда вы подписываетесь на a BehaviorSubject
, вы получите последнее переданное значение. Вот почему вы получаете только : second subs ["emitter value 2"]
в консоли.
shareReplay
Из документа :
Зачем использовать shareReplay?Обычно вы хотите использовать shareReplay, когда у вас есть побочные эффекты или обременительные вычисления, которые вы не хотите выполнять среди нескольких подписчиков. Это также может быть полезно в ситуациях, когда вы знаете, что у вас будут поздние подписчики на поток, которым нужен доступ к ранее переданным значениям. Эта способность воспроизводить значения при подписке — это то, что отличает share и shareReplay .
Это означает, что вы получите все отправленные значения без каких-либо вычислений, это своего рода кэш. Это также видно в консоли, у вас есть только одна печать, которая содержит все значения для второй подписки : second subs [null, "emitted value 1", "emitter value 2"]
Ответ №2:
Когда вы подписываетесь на observable, он получает «run» из-за отсутствия лучшего слова. Это может привести к путанице, поскольку значения распределяются во времени, поэтому мы немного придерживаемся синхронных вызовов. рассмотрим следующее наблюдение.
const getRandom = () => Math.floor(Math.random() * 10);
const stream$ = defer(() => from([getRandom(), getRandom(), getRandom()]));
stream$.subscribe(console.log); // 4 6 2
stream$.subscribe(console.log); // 4 4 2
stream$.subscribe(console.log); // 1 8 6
stream$.subscribe(console.log); // 8 6 7
Обратите внимание, что каждая подписка получает разные номера? Все они перезапускаются, getRandom()
и их подписки получают разные значения.
Если мы вставим накопитель stream$
, каждая подписка будет начинаться с нового начального значения и получать результат в зависимости от того, что излучает их поток.
итак:
const getRandom = () => Math.floor(Math.random() * 10);
const stream$ = defer(
() => from([getRandom(), getRandom(), getRandom()])
).pipe(
reduce((acc, val) => acc val, 0)
);
stream$.subscribe(console.log); // 12 (4 6 2)
stream$.subscribe(console.log); // 10 (4 4 2)
stream$.subscribe(console.log); // 15 (1 8 6)
stream$.subscribe(console.log); // 21 (8 6 7)
Посмотрите, как каждая подписка не добавляется к предыдущей? Это просто суммирование собственных чисел.
Они не разделяют аккумуляторы или значения или что-либо в разных подписках.
Итак, что происходит, когда вы используете что-то вроде shareReplay()
? В этом случае вместо того, чтобы подписываться на stream $ и «запускать» 4 разных раза, он подписывается только один раз, и результаты этой единственной подписки публикуются независимо от того, сколько людей подписались впоследствии.
const getRandom = () => Math.floor(Math.random() * 10);
const stream$ = defer(
() => from([getRandom(), getRandom(), getRandom()])
).pipe(
reduce((acc, val) => acc val, 0),
shareReplay(1)
);
stream$.subscribe(console.log); // 12 (4 6 2)
stream$.subscribe(console.log); // 12
stream$.subscribe(console.log); // 12
stream$.subscribe(console.log); // 12
Здесь наш аккумулятор запускается только один раз.
Если вы измените его следующим образом:
const getRandom = () => Math.floor(Math.random() * 10);
const stream$ = defer(
() => from([getRandom(), getRandom(), getRandom()])
).pipe(
shareReplay(3),
reduce((acc, val) => acc val, 0)
);
stream$.subscribe(console.log); // 12 (4 6 2)
stream$.subscribe(console.log); // 12 (4 6 2)
stream$.subscribe(console.log); // 12 (4 6 2)
stream$.subscribe(console.log); // 12 (4 6 2)
Затем аккумулятор запускается 4 раза, но getRandom()
вызывается только для создания первых 3 значений, после чего каждая подписка получает те же 3 числа.
Итак, чтобы ответить на ваш вопрос:
Когда у вас нет shareReplay, каждый подписчик запускает свой собственный scan
поток, начиная с момента подписки. С shareReply scan
подписывается только один раз. Вы отправляете эти результаты всем будущим подписчикам. В этом разница.