#rxjs
Вопрос:
Проблема
У меня есть следующие операторы:
const prepare = (value$: Observable<string>) =>
value$.pipe(
tap((x) => console.log("prepare: ", x)),
share()
);
const performTaskA = (removed$: Observable<string>) =>
removed$.pipe(tap((x) => console.log("taskA: ", x)));
const performTaskB = (removed$: Observable<string>) =>
removed$.pipe(
tap((x) => console.log("taskB 1: ", x)),
withLatestFrom(otherValue$),
tap((x) => console.log("taskB 2: ", x))
);
и я называю их так:
const prepared$ = value$.pipe(prepare);
const taskADone$ = prepared$.pipe(performTaskA);
const taskBDone$ = prepared$.pipe(performTaskB);
merge(taskADone$, taskBDone$).subscribe();
в результате получается следующий результат:
prepare: TEST
taskA: TEST
taskB 1: TEST
Обратите внимание, что это taskB 2
не регистрируется — похоже taskBDone
, что наблюдаемое остановилось на withLatestFrom(otherValue$)
входе performTaskB
.
Если share
вход prepare
удален, наблюдаемое не останавливается, но это (неудивительно) приводит к prepare
выполнению дважды, чего я не хочу.
Вопросы
- Как я могу выполнить оба
performTaskA
иperformTaskB
, ноprepare
только один раз? - Учитывая приведенное ниже объяснение отладки, почему
share
происходит изменение последовательности эмиссии?
ДЕМОНСТРАЦИЯ
С долей (как указано выше): https://codesandbox.io/s/so-share-with-latest-from-with-share-rtyex?file=/src/index.ts:663-853
Без доли: https://codesandbox.io/s/so-share-with-latest-from-no-share-p702e
Перейдите на вкладку Тесты справа, убедитесь, что консоль видна, и нажмите кнопку Воспроизведения.
Частичное Объяснение
Отладка withLatestFrom
очевидно, что когда источник( removed$
) испускает, ready
находится false
здесь, что предотвращает излучение.
Это происходит потому, что при share
наличии подписка input( otherValue$
) выдается после источника, поэтому ready
она еще не установлена. (Или это то, что share
вызвало более раннее излучение источника?)
Но когда share
он удален, входная подписка выводится до того, как источник имеет значение, значение ready
устанавливается здесь и здесь, и, следовательно withLatestFrom
, выводится, как и ожидалось.
Комментарии:
1. в
performTaskB
, вы используетеvalue$
непосредственно вwithLatestFrom
— не общую версию, возвращеннуюprepare
. Но на самомremoved$
деле аргумент является общимvalue$
, поэтому мы используемvalue$
в этой функции две разные версии. Это намеренно?2. Это было намеренно, потому что это показывало проблему, с которой я столкнулся. Однако я отредактировал вопрос так, что теперь
withLatestFrom
он ссылается на отдельную переменную, которая ближе к моему примеру в реальном мире и все еще показывает проблему.3. Спасибо — я думаю, что объяснение может быть похожим, но не идентичным вашему другому сегодняшнему вопросу. Какой «ТЕСТ» можно наблюдать (а также
otherValue$
), который вы проходите в этом случае?4. Я передаю их как холодные наблюдаемые объекты через
TestScheduler
/мрамор — см. codesandbox.io/s/… — что несколько имитирует мое приложение в реальном мире, гдеvalue$
естьSubject
и$otherValue
подключено к селектору ngrx. В любом случае, у меня та же проблема здесь и в моем приложении. Спасибо.5. @backtick Я немного углубился в проблему и соответствующим образом обновил вопрос.
Ответ №1:
Я попытался запустить ваш код и не смог заставить его зависнуть так, как это делает ваш. Я подозреваю, что это связано с тем, как ваша платформа тестирования управляет наблюдаемыми объектами. Я не могу повторить это.
Я заметил, что есть некоторые проблемы с чередованием/упорядочением, присущие тому, что вы написали. Совместное использование синхронных наблюдаемых объектов связано с такими проблемами, как синхронное наблюдение первым наблюдателем всех исходных значений и его завершение до того, как второй наблюдатель сможет подписаться. Это происходит, даже если они должны быть подписаны «в одно и то же время».
Упорядочение с помощью цикла событий
То, что я здесь написал, работает на меня:
function init(
value$: Observable<string>,
otherValue$: Observable<string>
) {
const prepare = pipe(
tap(x => console.log("prepare: ", x)),
delay(0),
share()
);
const performTaskA = pipe(
tap(x => console.log("taskA: ", x))
);
const performTaskB = pipe(
tap(x => console.log("taskB 1: ", x)),
withLatestFrom(otherValue$),
tap(x => console.log("taskB 2: ", x))
);
const prepared$ = value$.pipe(prepare);
merge(
prepared$.pipe(performTaskA),
prepared$.pipe(performTaskB)
).subscribe();
}
init(of("a"), of("b"));
Для меня это выводит это на консоль:
prepare: a
taskA: a
taskB 1: a
taskB 2: ["a","b"]
Вы заметите вызов с задержкой(0) внутри prepare
. Без этого подготовка вызывается дважды, так как общее наблюдаемое синхронно завершено. Задержка(0) просто помещает следующий вызов в очередь событий как можно скорее.
Это не лучшее решение. Это взлом. Лучшее решение зависит от того, как это используется. Большую часть времени shareReplay(1) выполняет свою работу. Если вы используете это здесь, это сработает.
Заказ с помощью Публикации/подключения
В противном случае вы можете опубликовать и подключиться, чтобы убедиться, что заказ соответствует вашим ожиданиям.
Публикация/Подключение позволяет настроить все ваши подписки на источник до того, как источник будет официально запущен/подключен/подписан. Это гарантирует, что под капотом все обратные вызовы и будут на месте до того, как что-либо произойдет. Это единственный способ гарантировать, что полностью синхронный наблюдаемый объект может делиться своими значениями.
function init(
value$: Observable<string>,
otherValue$: Observable<string>
) {
const prepare = pipe(
tap(x => console.log("prepare: ", x)),
share()
);
const performTaskA = pipe(
tap(x => console.log("taskA: ", x))
);
const performTaskB = pipe(
tap(x => console.log("taskB 1: ", x)),
withLatestFrom(otherValue$),
tap(x => console.log("taskB 2: ", x))
);
const prepared$ = publish()(value$.pipe(prepare));
merge(
prepared$.pipe(performTaskA),
prepared$.pipe(performTaskB)
).subscribe();
prepared$.connect();
}
Комментарии:
1. Большое спасибо за ваш ответ и приношу извинения за задержку с ответом. Я ценю объяснение, однако 1-го раствора не представляет свой сценарий (где вход наблюдаемых не в комплекте), а 2-й раствор по-прежнему выполняет ту же проблему как у меня изначально были — см. codesandbox.Ио/с/… — запуск теста от тестов вкладке справа и соблюдать вывод на консоль в правом нижнем углу. Говоря, что с тех пор я нашел другой подход, так что не чувствуйте себя обязанным смотреть на это дальше!
2. Я подозреваю, что это нечто более близкое к тому, как ваша платформа тестирования управляет наблюдаемыми объектами, а не проблема с вашим пониманием RxJS как такового. В любом случае, я рад, что вы что-то поняли 🙂