Блокировка потока данных до выхода в Сеть

#rx-java #rx-java2

Вопрос:

У меня есть некоторые буферизованные данные в потоке, которые я отправляю, как только буфер заполнится. если только мы не в автономном режиме, в этом случае я хочу заблокировать, пока мы не вернемся в Сеть.

Код, который у меня есть без автономного бита, таков:

     dataSubject
        .toFlowable(BackpressureStrategy.BUFFER)
        .compose(
            Transformers
                .buffer(
                    100,
                    5L,
                    TimeUnit.MINUTES,
                    ScheduleProvider.computation
                )
        ) 
    .concatMapCompletable { dataSender.sendData(it) }
    .subscribe(/* deal with errors etc.. */)
 

У меня есть еще одна наблюдаемая networkConnectivity: Flowable<Connectivity>
Часть, которую я не могу понять, — это как использовать это, чтобы ограничить отправку вышеуказанного кода только в режиме онлайн.

Сначала я пытался Flowables.combineLatest , но это создает дубликаты отправляемых данных, так как, если мы перейдем в автономный режим и вернемся в онлайн (но данные не изменятся), будут отправлены одни и те же данные.

Я тоже пытался

 networkConnectivity
    .flatMap {
        if (it.isConnected()) {
          dataSubject
            .toFlowable(BackpressureStrategy.BUFFER)
            .compose(
                Transformers
                    .buffer(
                        100,
                        5L,
                        TimeUnit.MINUTES,
                        ScheduleProvider.computation
                    )
            ) 
        } else {
            Flowable.empty()
        }
    }
 

Но объект данных не подписан до тех пор, пока мы не подключимся, и к тому времени мы пропустили много данных. Создание объекта данных в качестве объекта ReplaySubject не кажется хорошей идеей, так как это приведет к повторной отправке всего.

Любая помощь будет очень признательна!

Ответ №1:

Если дублирование данных-единственная проблема , с которой вы сталкиваетесь Flowables.combineLatest , то вы можете попробовать добавить distinctUntilChanged оператора позже combineLatest . Он будет фильтровать дублированные данные. Вот схема того, как это работает: https://rxmarbles.com/#distinctUntilChanged

Комментарии:

1. Спасибо за идею. Я немного подумал об этом, я думаю, что у меня все еще есть проблема с тем, что если я нахожусь в автономном режиме и получаю несколько буферизованных элементов, я получу только последние.