Как подписаться нескольким наблюдателям на один наблюдаемый объект с помощью RxJava?

#java #rx-java3

#java #rx-java3

Вопрос:

У меня есть Observable<List<Event>> , и я хочу, чтобы это наблюдаемое было доступно нескольким подписчикам. Каждый подписчик будет фильтровать каждое событие и обрабатывать его.

Он Observable<List<Event>> был создан таким образом :

     @Override
    public List<Event> findNewEvents() {
        List<Event> results = new ArrayList<>();
        while(! fetchedEvents.isEmpty()) {
            results.add(fetchedEvents.poll());
        }
        return results;
    }

    @Override
    public Observable<List<Event>> findNewObservableEvents() {
        return Observable.just(findNewEvents());
    }
 

Вот код :

             Observable<List<Event>> newEvents = reader.findNewObservableEvents();

            Disposable riskApproveRiskEventsDisposable = newEvents.flatMapIterable(riskEvents -> riskEvents)
                    .flatMap(Observable::just)
                    .filter(risk::isForRiskApproval)
                    .subscribe(risk::approveRisk);

            Disposable fundingCheckFundabilityEventsDisposable = newEvents.flatMapIterable(riskEvents -> riskEvents)
                    .flatMap(Observable::just)
                    .filter(funding::isForFundingFundabilityCheck)
                    .subscribe(funding::checkFundability);

            Disposable fundingFundEventsDisposable = newEvents.flatMapIterable(riskEvents -> riskEvents)
                    .flatMap(Observable::just)
                    .filter(funding::isForFundingFund)
                    .subscribe(funding::fund);
 

Я пробовал :

newEvents.share() а также newEvents.publish() .

При попытке: newEvents.create() мне нужно указать ObservableOnSubscribe объект, но я не понимаю, как его получить.

В чем хитрость?

Ответ №1:

Если вы не хотите использовать findNewObservableEvents несколько раз, используйте publish и, как только подписчики подписались, вызовите connect ConnectableObservable :

 ConnectableObservable<List<Event>> newEvents = reader.findNewObservableEvents().publish();

Disposable riskApproveRiskEventsDisposable = 
         newEvents.flatMapIterable(riskEvents -> riskEvents)
                .flatMap(Observable::just)
                .filter(risk::isForRiskApproval)
                .subscribe(risk::approveRisk);

 Disposable fundingCheckFundabilityEventsDisposable = 
         newEvents.flatMapIterable(riskEvents -> riskEvents)
                .flatMap(Observable::just)
                .filter(funding::isForFundingFundabilityCheck)
                .subscribe(funding::checkFundability);

Disposable fundingFundEventsDisposable = 
         newEvents.flatMapIterable(riskEvents -> riskEvents)
                .flatMap(Observable::just)
                .filter(funding::isForFundingFund)
                .subscribe(funding::fund);

Disposable connection = newEvents.connect();