#rx-java #rx-java2
Вопрос:
У меня есть некоторые буферизованные данные в потоке, которые я отправляю, как только буфер заполнится. если только мы не в автономном режиме, в этом случае я хочу заблокировать, пока мы не вернемся в Сеть.
Код, который у меня есть без автономного бита, таков:
dataSubject
.toFlowable(BackpressureStrategy.BUFFER)
.compose(
Transformers
.buffer(
100,
5L,
TimeUnit.MINUTES,
ScheduleProvider.computation
)
)
.concatMapCompletable { dataSender.sendData(it) }
.subscribe(/* deal with errors etc.. */)
У меня есть еще одна наблюдаемая networkConnectivity: Flowable<Connectivity>
Часть, которую я не могу понять, — это как использовать это, чтобы ограничить отправку вышеуказанного кода только в режиме онлайн.
Сначала я пытался Flowables.combineLatest
, но это создает дубликаты отправляемых данных, так как, если мы перейдем в автономный режим и вернемся в онлайн (но данные не изменятся), будут отправлены одни и те же данные.
Я тоже пытался
networkConnectivity
.flatMap {
if (it.isConnected()) {
dataSubject
.toFlowable(BackpressureStrategy.BUFFER)
.compose(
Transformers
.buffer(
100,
5L,
TimeUnit.MINUTES,
ScheduleProvider.computation
)
)
} else {
Flowable.empty()
}
}
Но объект данных не подписан до тех пор, пока мы не подключимся, и к тому времени мы пропустили много данных. Создание объекта данных в качестве объекта ReplaySubject не кажется хорошей идеей, так как это приведет к повторной отправке всего.
Любая помощь будет очень признательна!
Ответ №1:
Если дублирование данных-единственная проблема , с которой вы сталкиваетесь Flowables.combineLatest
, то вы можете попробовать добавить distinctUntilChanged
оператора позже combineLatest
. Он будет фильтровать дублированные данные. Вот схема того, как это работает: https://rxmarbles.com/#distinctUntilChanged
Комментарии:
1. Спасибо за идею. Я немного подумал об этом, я думаю, что у меня все еще есть проблема с тем, что если я нахожусь в автономном режиме и получаю несколько буферизованных элементов, я получу только последние.