Почему использование общего доступа блокирует наблюдаемое использование withLatestFrom?

#rxjs

Вопрос:

Проблема

У меня есть следующие операторы:

   const prepare = (value$: Observable<string>) =>
    value$.pipe(
      tap((x) => console.log("prepare: ", x)),
      share()
    );

  const performTaskA = (removed$: Observable<string>) =>
    removed$.pipe(tap((x) => console.log("taskA: ", x)));

  const performTaskB = (removed$: Observable<string>) =>
    removed$.pipe(
      tap((x) => console.log("taskB 1: ", x)),
      withLatestFrom(otherValue$),
      tap((x) => console.log("taskB 2: ", x))
    );
 

и я называю их так:

   const prepared$ = value$.pipe(prepare);
  const taskADone$ = prepared$.pipe(performTaskA);
  const taskBDone$ = prepared$.pipe(performTaskB);

  merge(taskADone$, taskBDone$).subscribe();
 

в результате получается следующий результат:

 prepare:  TEST 
taskA:  TEST 
taskB 1:  TEST
 

Обратите внимание, что это taskB 2 не регистрируется — похоже taskBDone , что наблюдаемое остановилось на withLatestFrom(otherValue$) входе performTaskB .

Если share вход prepare удален, наблюдаемое не останавливается, но это (неудивительно) приводит к prepare выполнению дважды, чего я не хочу.

Вопросы

  1. Как я могу выполнить оба performTaskA и performTaskB , но prepare только один раз?
  2. Учитывая приведенное ниже объяснение отладки, почему share происходит изменение последовательности эмиссии?

ДЕМОНСТРАЦИЯ
С долей (как указано выше): https://codesandbox.io/s/so-share-with-latest-from-with-share-rtyex?file=/src/index.ts:663-853
Без доли: https://codesandbox.io/s/so-share-with-latest-from-no-share-p702e

Перейдите на вкладку Тесты справа, убедитесь, что консоль видна, и нажмите кнопку Воспроизведения.

Частичное Объяснение
Отладка withLatestFrom очевидно, что когда источник( removed$ ) испускает, ready находится false здесь, что предотвращает излучение.

Это происходит потому, что при share наличии подписка input( otherValue$ ) выдается после источника, поэтому ready она еще не установлена. (Или это то, что share вызвало более раннее излучение источника?)

Но когда share он удален, входная подписка выводится до того, как источник имеет значение, значение ready устанавливается здесь и здесь, и, следовательно withLatestFrom , выводится, как и ожидалось.

Комментарии:

1. в performTaskB , вы используете value$ непосредственно в withLatestFrom — не общую версию, возвращенную prepare . Но на самом removed$ деле аргумент является общим value$ , поэтому мы используем value$ в этой функции две разные версии. Это намеренно?

2. Это было намеренно, потому что это показывало проблему, с которой я столкнулся. Однако я отредактировал вопрос так, что теперь withLatestFrom он ссылается на отдельную переменную, которая ближе к моему примеру в реальном мире и все еще показывает проблему.

3. Спасибо — я думаю, что объяснение может быть похожим, но не идентичным вашему другому сегодняшнему вопросу. Какой «ТЕСТ» можно наблюдать (а также otherValue$ ), который вы проходите в этом случае?

4. Я передаю их как холодные наблюдаемые объекты через TestScheduler /мрамор — см. codesandbox.io/s/… — что несколько имитирует мое приложение в реальном мире, где value$ есть Subject и $otherValue подключено к селектору ngrx. В любом случае, у меня та же проблема здесь и в моем приложении. Спасибо.

5. @backtick Я немного углубился в проблему и соответствующим образом обновил вопрос.

Ответ №1:

Я попытался запустить ваш код и не смог заставить его зависнуть так, как это делает ваш. Я подозреваю, что это связано с тем, как ваша платформа тестирования управляет наблюдаемыми объектами. Я не могу повторить это.

Я заметил, что есть некоторые проблемы с чередованием/упорядочением, присущие тому, что вы написали. Совместное использование синхронных наблюдаемых объектов связано с такими проблемами, как синхронное наблюдение первым наблюдателем всех исходных значений и его завершение до того, как второй наблюдатель сможет подписаться. Это происходит, даже если они должны быть подписаны «в одно и то же время».

Упорядочение с помощью цикла событий

То, что я здесь написал, работает на меня:

 function init(
  value$: Observable<string>,
  otherValue$: Observable<string>
) {
  const prepare = pipe(
    tap(x => console.log("prepare: ", x)),
    delay(0),
    share()
  );

  const performTaskA = pipe(
    tap(x => console.log("taskA: ", x))
  );

  const performTaskB = pipe(
    tap(x => console.log("taskB 1: ", x)),
    withLatestFrom(otherValue$),
    tap(x => console.log("taskB 2: ", x))
  );

  const prepared$ = value$.pipe(prepare);

  merge(
    prepared$.pipe(performTaskA), 
    prepared$.pipe(performTaskB)
  ).subscribe();
}

init(of("a"), of("b"));
 

Для меня это выводит это на консоль:

 prepare: a
taskA: a
taskB 1: a
taskB 2: ["a","b"]
 

Вы заметите вызов с задержкой(0) внутри prepare . Без этого подготовка вызывается дважды, так как общее наблюдаемое синхронно завершено. Задержка(0) просто помещает следующий вызов в очередь событий как можно скорее.

Это не лучшее решение. Это взлом. Лучшее решение зависит от того, как это используется. Большую часть времени shareReplay(1) выполняет свою работу. Если вы используете это здесь, это сработает.

Заказ с помощью Публикации/подключения

В противном случае вы можете опубликовать и подключиться, чтобы убедиться, что заказ соответствует вашим ожиданиям.

Публикация/Подключение позволяет настроить все ваши подписки на источник до того, как источник будет официально запущен/подключен/подписан. Это гарантирует, что под капотом все обратные вызовы и будут на месте до того, как что-либо произойдет. Это единственный способ гарантировать, что полностью синхронный наблюдаемый объект может делиться своими значениями.

 function init(
  value$: Observable<string>,
  otherValue$: Observable<string>
) {
  const prepare = pipe(
    tap(x => console.log("prepare: ", x)),
    share()
  );

  const performTaskA = pipe(
    tap(x => console.log("taskA: ", x))
  );

  const performTaskB = pipe(
    tap(x => console.log("taskB 1: ", x)),
    withLatestFrom(otherValue$),
    tap(x => console.log("taskB 2: ", x))
  );

  const prepared$ = publish()(value$.pipe(prepare));

  merge(
    prepared$.pipe(performTaskA), 
    prepared$.pipe(performTaskB)
  ).subscribe();

  prepared$.connect();
}
 

Комментарии:

1. Большое спасибо за ваш ответ и приношу извинения за задержку с ответом. Я ценю объяснение, однако 1-го раствора не представляет свой сценарий (где вход наблюдаемых не в комплекте), а 2-й раствор по-прежнему выполняет ту же проблему как у меня изначально были — см. codesandbox.Ио/с/… — запуск теста от тестов вкладке справа и соблюдать вывод на консоль в правом нижнем углу. Говоря, что с тех пор я нашел другой подход, так что не чувствуйте себя обязанным смотреть на это дальше!

2. Я подозреваю, что это нечто более близкое к тому, как ваша платформа тестирования управляет наблюдаемыми объектами, а не проблема с вашим пониманием RxJS как такового. В любом случае, я рад, что вы что-то поняли 🙂