Rx: запустить код после завершения всех подписок для одной «следующей»

#ecmascript-6 #rxjs #reactive-extensions-js

#ecmascript-6 #rxjs #реактивные расширения-js

Вопрос:

Допустим, у меня есть тема где-то в моем коде

 mySubject$ = new Subject()
  

В другом месте кода создается любое количество подписок, например

 someSubscription = mySubject$.subscribe(() => console.log('I love streams'))
anotherSubscription =  mySubject$.subscribe(() => console.log('me too!'))
  

Когда я .next() открываю тему, обе подписки срабатывают, как и ожидалось.

Как я могу подключиться к механизму rx, чтобы я мог запустить некоторый код после того, как все подписки завершат выполнение своего кода (для этого единственного next события)?

Есть ли что-то вроде mySubject$.onAllSubscriptionsCompleted(() => console.log('all done')

Или каковы альтернативы для достижения этого?

Комментарии:

1. Почему вам нужно выполнить что-то подобное, не могли бы вы описать основную проблему, которую вы пытаетесь решить? (Потому что похоже, что проблема не в ReactiveX)

Ответ №1:

RxJS не имеет такого обратного (восходящего?) распространения событий, если только это не подписка / отмена подписки.

Строго говоря: это не Rx-способ установить такую связь между подписчиком и наблюдаемым.

Тем не менее, есть способы добиться этого:

  • у вас могли бы быть другие субъекты для отправки «обработанных» уведомлений и обработки из вашего observable. Кошмарная вещь для поддержки, имхо

  • попросите своих подписчиков повторно подписаться и отреагировать на это в вашем observable. Предположение о распространении этого события подписки / отказа от подписки. Кажется еще более кошмарным. И это должно

  • имейте какое-то обратное давление, простейшим из которых является concatMap , внутри которого вы могли бы выполнять всю свою обработку. Тем не менее, при таком подходе вам придется переосмыслить эти отношения генератор-обработчики

Вот грубая иллюстрация потока, использующего concatMap

                                 handler1
source$ -> concatMap( forkJoin( handler2 ) ) -> subscribe( here all handlers finished )
                                handler3
  

Если вы хотите узнать больше о противодавлении, вот статья о методах противодавления в RxJS