#android #kotlin #rx-java #rx-java3
Вопрос:
Есть ли эксперты RxJava, которые могут помочь мне разобраться в этом? У меня есть функция, которая возвращает Single<ByteArray>
значение на основе некоторого position
параметра. В принципе, я хочу продолжать вызывать эту функцию, пока она не вернет пустое ByteArray
значение .
ПРИМЕЧАНИЕ: Параметр position должен указывать, сколько байтов уже прочитано. Например, в первый раз он будет равен 0, в следующий раз будет указано, сколько байтов было извлечено в первый раз, и так далее.
Это то, что у меня есть сейчас:
fun readBytes(position: Int) : Single<ByteArray>{ ... }
fun readAllLogs() : Single<LogEntry> {
val oldPosition = AtomicInteger(0)
val position = AtomicInteger(0)
Observable.defer {
readBytes(position.get())
}.reduceWith({ LogEntry() }, { log, bytes ->
position.addAndGet(bytes.size)
log = bytes
log
}).repeatUntil {
val canWeStop = position.get() == oldPosition.get()
oldPosition.set(position.get())
canWeStop
}.toObservable()
}
Для полноты картины, вот мой накопитель логов
data class LogEntry {
var data: ByteArray = byteArrayOf()
private set
operator fun plusAssign(bytes: ByteArray) {
data = bytes
}
}
Вопрос 1:
Как я могу элегантно выйти из этого потока? например, не отслеживать отдельные переменные вне потока?
Вопрос 2: Какие-либо оптимизации я могу сделать? Я дважды перехожу от одного к наблюдаемому, чтобы иметь возможность связать эти операторы в цепочку. Я уверен, что это может быть более эффективным
Комментарии:
1. Чтобы отслеживать эти переменные вне потока чистым способом, вы обертываете содержимое журналов чтения с помощью Observable. откладывать. Таким образом, возвращаемое наблюдаемое будет хорошо себя вести в том смысле, что на него можно подписываться много раз без побочных эффектов, связанных с состоянием этих переменных.