#java #rx-java3
#java #rx-java3
Вопрос:
У меня есть Observable<List<Event>>
, и я хочу, чтобы это наблюдаемое было доступно нескольким подписчикам. Каждый подписчик будет фильтровать каждое событие и обрабатывать его.
Он Observable<List<Event>>
был создан таким образом :
@Override
public List<Event> findNewEvents() {
List<Event> results = new ArrayList<>();
while(! fetchedEvents.isEmpty()) {
results.add(fetchedEvents.poll());
}
return results;
}
@Override
public Observable<List<Event>> findNewObservableEvents() {
return Observable.just(findNewEvents());
}
Вот код :
Observable<List<Event>> newEvents = reader.findNewObservableEvents();
Disposable riskApproveRiskEventsDisposable = newEvents.flatMapIterable(riskEvents -> riskEvents)
.flatMap(Observable::just)
.filter(risk::isForRiskApproval)
.subscribe(risk::approveRisk);
Disposable fundingCheckFundabilityEventsDisposable = newEvents.flatMapIterable(riskEvents -> riskEvents)
.flatMap(Observable::just)
.filter(funding::isForFundingFundabilityCheck)
.subscribe(funding::checkFundability);
Disposable fundingFundEventsDisposable = newEvents.flatMapIterable(riskEvents -> riskEvents)
.flatMap(Observable::just)
.filter(funding::isForFundingFund)
.subscribe(funding::fund);
Я пробовал :
newEvents.share()
а также newEvents.publish()
.
При попытке: newEvents.create()
мне нужно указать ObservableOnSubscribe
объект, но я не понимаю, как его получить.
В чем хитрость?
Ответ №1:
Если вы не хотите использовать findNewObservableEvents
несколько раз, используйте publish
и, как только подписчики подписались, вызовите connect
ConnectableObservable
:
ConnectableObservable<List<Event>> newEvents = reader.findNewObservableEvents().publish();
Disposable riskApproveRiskEventsDisposable =
newEvents.flatMapIterable(riskEvents -> riskEvents)
.flatMap(Observable::just)
.filter(risk::isForRiskApproval)
.subscribe(risk::approveRisk);
Disposable fundingCheckFundabilityEventsDisposable =
newEvents.flatMapIterable(riskEvents -> riskEvents)
.flatMap(Observable::just)
.filter(funding::isForFundingFundabilityCheck)
.subscribe(funding::checkFundability);
Disposable fundingFundEventsDisposable =
newEvents.flatMapIterable(riskEvents -> riskEvents)
.flatMap(Observable::just)
.filter(funding::isForFundingFund)
.subscribe(funding::fund);
Disposable connection = newEvents.connect();