RxJava Уменьшает одиночные до тех пор, пока не будет выполнено какое-либо динамическое условие

#android #kotlin #rx-java #rx-java3

Вопрос:

Есть ли эксперты RxJava, которые могут помочь мне разобраться в этом? У меня есть функция, которая возвращает Single<ByteArray> значение на основе некоторого position параметра. В принципе, я хочу продолжать вызывать эту функцию, пока она не вернет пустое ByteArray значение .

ПРИМЕЧАНИЕ: Параметр position должен указывать, сколько байтов уже прочитано. Например, в первый раз он будет равен 0, в следующий раз будет указано, сколько байтов было извлечено в первый раз, и так далее.

Это то, что у меня есть сейчас:

 fun readBytes(position: Int) : Single<ByteArray>{ ... }


fun readAllLogs() : Single<LogEntry> {

    val oldPosition = AtomicInteger(0)
    val position = AtomicInteger(0)
    Observable.defer {
        readBytes(position.get())
    }.reduceWith({ LogEntry() }, { log, bytes -> 
        position.addAndGet(bytes.size)
        log  = bytes
        log
    }).repeatUntil { 

        val canWeStop = position.get() == oldPosition.get()
        oldPosition.set(position.get())
        canWeStop

    }.toObservable()
}
 

Для полноты картины, вот мой накопитель логов

 data class LogEntry {
    var data: ByteArray = byteArrayOf()
        private set


    operator fun plusAssign(bytes: ByteArray) {
        data  = bytes
    }
}
 

Вопрос 1:
Как я могу элегантно выйти из этого потока? например, не отслеживать отдельные переменные вне потока?

Вопрос 2: Какие-либо оптимизации я могу сделать? Я дважды перехожу от одного к наблюдаемому, чтобы иметь возможность связать эти операторы в цепочку. Я уверен, что это может быть более эффективным

Комментарии:

1. Чтобы отслеживать эти переменные вне потока чистым способом, вы обертываете содержимое журналов чтения с помощью Observable. откладывать. Таким образом, возвращаемое наблюдаемое будет хорошо себя вести в том смысле, что на него можно подписываться много раз без побочных эффектов, связанных с состоянием этих переменных.