Реализация деблокированного буфера с помощью RxSwift, это правильно?

#swift #rx-swift #reactivex

#swift #rx-swift #reactivex

Вопрос:

Я относительно новичок в RxSwift, но я с нетерпением жду возможности использовать его в своих проектах, и я хотел бы услышать отзывы об операторе, который я только что написал.

Функциональность, которую мне не хватает, — это деблокированный буфер: буфер, который ведет себя точно так же, как debounce оператор, но вместо того, чтобы выдавать только последнее значение, он должен выдавать все собранные значения с момента последнего выпуска.

В RxJava это легко достижимо, используя буфер с другим наблюдаемым в качестве «закрывающего селектора»:

 // From: https://github.com/ReactiveX/RxJava/wiki/Backpressure
//
// we have to multicast the original bursty Observable so we can use it
// both as our source and as the source for our buffer closing selector:
Observable<Integer> burstyMulticast = bursty.publish().refCount();
// burstyDebounced will be our buffer closing selector:
Observable<Integer> burstyDebounced = burstMulticast.debounce(10, TimeUnit.MILLISECONDS);
// and this, finally, is the Observable of buffers we're interested in:
Observable<List<Integer>> burstyBuffered = burstyMulticast.buffer(burstyDebounced);
  

В RxSwift, хотя эта версия оператора буфера не существует (я думаю, что эта проблема связана: https://github.com/ReactiveX/RxSwift/issues/590 ), поэтому я попытался решить эту проблему сам.


Мой первый подход заключался в том, чтобы просто создать деблокированный буфер:

 extension ObservableType {
    func debouncedBuffer(_ dueTime: RxTimeInterval, scheduler: SchedulerType) -> Observable<[E]> {
        var valueBuffer: [E] = []

        let observable = self.do(onNext: { (value) in
            valueBuffer.append(value)
        }, onError: { (error) in
            valueBuffer = []
        }, onCompleted: {
            valueBuffer = []
        }, onSubscribe: {
            valueBuffer = []
        }, onDispose: {
            valueBuffer = []
        }).debounce(dueTime, scheduler: scheduler).flatMap { (value) -> Observable<[E]> in
            let emitValues = valueBuffer
            valueBuffer = []
            return Observable<[E]>.just(emitValues)
        }

        return observable
    }
}
  

Мой второй подход заключался в создании буфера, для которого любое условие закрытия (например, версия RxJava):

 extension ObservableType {
    func buffer<R>(_ selector: Observable<R>) -> Observable<[E]> {
        var valueBuffer: [E] = []

        return Observable.create { observer in
            let selectorSubscription = selector.subscribe(onNext: { (value) in
                let emitValues = valueBuffer
                valueBuffer = []
                observer.on(.next(emitValues))
            }, onError: { (error) in
                valueBuffer = []
                observer.on(.error(error))
            }, onCompleted: {
                valueBuffer = []
                observer.on(.completed)
            }, onDisposed: {
                valueBuffer = []
            })

            let subscription = self.subscribe(onNext: { (value) in
                valueBuffer.append(value)
            }, onError: { (error) in
                observer.on(.error(error))
                selectorSubscription.dispose()
            }, onCompleted: {
                observer.on(.completed)
                selectorSubscription.dispose()
            }, onDisposed: {
                observer.on(.completed)
                selectorSubscription.dispose()
            })
            return subscription
        }
    }
}
  

Я протестировал эти два оператора, и они, похоже, работают, также протестировал обработку различных комбинаций событий onError, onDispose и OnCompleted.

Но я все равно хотел бы услышать отзывы от более опытных людей, если это, по крайней мере, приемлемое решение без утечек, и если я нарушаю какие-либо контракты RX.

Я также создал pasterbin с некоторым тестовым кодом: http://pastebin.com/1iAbUPf8

Комментарии:

1. Я бы посоветовал вам предложить PR для RxSwiftExt , а также проверить канал RxSwift Slack .

2. Спасибо, канал slack — хорошая идея, и я подумаю о создании PR.

Ответ №1:

Вот мой для buffer(bufferOpenings, bufferClosingSelector) . Это может потребовать дальнейшего рассмотрения.

 extension ObservableType {

    func buffer<R>(bufferOpenings: Observable<R>, bufferClosingSelector: (R)->Observable<R>) -> Observable<[E]> {
        var valueBuffer: [E]? = nil

        let operatorObservable = Observable<[E]>.create({ observer in
            let subject = PublishSubject<[E]>()

            let closingsSub = bufferOpenings
                .doOnNext({ _ in
                    valueBuffer = []
                })
                .flatMap({ opening in
                    return bufferClosingSelector(opening)
                })
                .subscribeNext({ _ in
                    if let vb = valueBuffer {
                        subject.onNext(vb)
                    }
                    valueBuffer = nil
                }
            )

            let bufferSub = self.subscribe(
                onNext: { value in
                    valueBuffer?.append(value)
                },
                onError: { error in
                    subject.onError(error)
                },
                onCompleted: {
                    subject.onCompleted()
                },
                onDisposed: {
                }
            )

            let subjectSub = subject.subscribe(
                onNext: { (value) in
                    observer.onNext(value)
                },
                onError: { (error) in
                    observer.onError(error)
                },
                onCompleted: {
                    observer.onCompleted()
                },
                onDisposed: {
                }
            )

            let combinedDisposable = CompositeDisposable()

            combinedDisposable.addDisposable(closingsSub)
            combinedDisposable.addDisposable(bufferSub)
            combinedDisposable.addDisposable(subjectSub)

            return combinedDisposable

        })

        return operatorObservable
    }

}