#swift #rx-swift #reactivex
#swift #rx-swift #reactivex
Вопрос:
Я относительно новичок в RxSwift, но я с нетерпением жду возможности использовать его в своих проектах, и я хотел бы услышать отзывы об операторе, который я только что написал.
Функциональность, которую мне не хватает, — это деблокированный буфер: буфер, который ведет себя точно так же, как debounce
оператор, но вместо того, чтобы выдавать только последнее значение, он должен выдавать все собранные значения с момента последнего выпуска.
В RxJava это легко достижимо, используя буфер с другим наблюдаемым в качестве «закрывающего селектора»:
// From: https://github.com/ReactiveX/RxJava/wiki/Backpressure
//
// we have to multicast the original bursty Observable so we can use it
// both as our source and as the source for our buffer closing selector:
Observable<Integer> burstyMulticast = bursty.publish().refCount();
// burstyDebounced will be our buffer closing selector:
Observable<Integer> burstyDebounced = burstMulticast.debounce(10, TimeUnit.MILLISECONDS);
// and this, finally, is the Observable of buffers we're interested in:
Observable<List<Integer>> burstyBuffered = burstyMulticast.buffer(burstyDebounced);
В RxSwift, хотя эта версия оператора буфера не существует (я думаю, что эта проблема связана: https://github.com/ReactiveX/RxSwift/issues/590 ), поэтому я попытался решить эту проблему сам.
Мой первый подход заключался в том, чтобы просто создать деблокированный буфер:
extension ObservableType {
func debouncedBuffer(_ dueTime: RxTimeInterval, scheduler: SchedulerType) -> Observable<[E]> {
var valueBuffer: [E] = []
let observable = self.do(onNext: { (value) in
valueBuffer.append(value)
}, onError: { (error) in
valueBuffer = []
}, onCompleted: {
valueBuffer = []
}, onSubscribe: {
valueBuffer = []
}, onDispose: {
valueBuffer = []
}).debounce(dueTime, scheduler: scheduler).flatMap { (value) -> Observable<[E]> in
let emitValues = valueBuffer
valueBuffer = []
return Observable<[E]>.just(emitValues)
}
return observable
}
}
Мой второй подход заключался в создании буфера, для которого любое условие закрытия (например, версия RxJava):
extension ObservableType {
func buffer<R>(_ selector: Observable<R>) -> Observable<[E]> {
var valueBuffer: [E] = []
return Observable.create { observer in
let selectorSubscription = selector.subscribe(onNext: { (value) in
let emitValues = valueBuffer
valueBuffer = []
observer.on(.next(emitValues))
}, onError: { (error) in
valueBuffer = []
observer.on(.error(error))
}, onCompleted: {
valueBuffer = []
observer.on(.completed)
}, onDisposed: {
valueBuffer = []
})
let subscription = self.subscribe(onNext: { (value) in
valueBuffer.append(value)
}, onError: { (error) in
observer.on(.error(error))
selectorSubscription.dispose()
}, onCompleted: {
observer.on(.completed)
selectorSubscription.dispose()
}, onDisposed: {
observer.on(.completed)
selectorSubscription.dispose()
})
return subscription
}
}
}
Я протестировал эти два оператора, и они, похоже, работают, также протестировал обработку различных комбинаций событий onError, onDispose и OnCompleted.
Но я все равно хотел бы услышать отзывы от более опытных людей, если это, по крайней мере, приемлемое решение без утечек, и если я нарушаю какие-либо контракты RX.
Я также создал pasterbin с некоторым тестовым кодом: http://pastebin.com/1iAbUPf8
Комментарии:
1. Я бы посоветовал вам предложить PR для RxSwiftExt , а также проверить канал RxSwift Slack .
2. Спасибо, канал slack — хорошая идея, и я подумаю о создании PR.
Ответ №1:
Вот мой для buffer(bufferOpenings, bufferClosingSelector)
. Это может потребовать дальнейшего рассмотрения.
extension ObservableType {
func buffer<R>(bufferOpenings: Observable<R>, bufferClosingSelector: (R)->Observable<R>) -> Observable<[E]> {
var valueBuffer: [E]? = nil
let operatorObservable = Observable<[E]>.create({ observer in
let subject = PublishSubject<[E]>()
let closingsSub = bufferOpenings
.doOnNext({ _ in
valueBuffer = []
})
.flatMap({ opening in
return bufferClosingSelector(opening)
})
.subscribeNext({ _ in
if let vb = valueBuffer {
subject.onNext(vb)
}
valueBuffer = nil
}
)
let bufferSub = self.subscribe(
onNext: { value in
valueBuffer?.append(value)
},
onError: { error in
subject.onError(error)
},
onCompleted: {
subject.onCompleted()
},
onDisposed: {
}
)
let subjectSub = subject.subscribe(
onNext: { (value) in
observer.onNext(value)
},
onError: { (error) in
observer.onError(error)
},
onCompleted: {
observer.onCompleted()
},
onDisposed: {
}
)
let combinedDisposable = CompositeDisposable()
combinedDisposable.addDisposable(closingsSub)
combinedDisposable.addDisposable(bufferSub)
combinedDisposable.addDisposable(subjectSub)
return combinedDisposable
})
return operatorObservable
}
}