использование CountDownLatch.await() для обеспечения доставки результата

#android #rx-java2 #countdownlatch

#Android #rx-java2 #countdownlatch

Вопрос:

Полный исходный код можно найти здесь: https://github.com/alirezaeiii/SavingGoals-Cache

Это класс LocalDataSource :

 @Singleton
class QapitalLocalDataSource @Inject constructor(
    private val goalsDao: GoalsDao
) : LocalDataSource {

    override fun getSavingsGoals(): Single<List<SavingsGoal>> =
        Single.create { singleSubscriber ->
            goalsDao.getGoals()
                .subscribe {
                    if (it.isEmpty()) {
                        singleSubscriber.onError(NoDataException())
                    } else {
                        singleSubscriber.onSuccess(it)
                    }
                }
        }
}
 

Вышеуказанный метод использовался в классе репозитория :

 @Singleton
class GoalsRepository @Inject constructor(
    private val remoteDataSource: QapitalService,
    private val localDataSource: LocalDataSource,
    private val schedulerProvider: BaseSchedulerProvider
) {

    private var cacheIsDirty = false

    fun getSavingsGoals(): Observable<List<SavingsGoal>> {
        lateinit var goals: Observable<List<SavingsGoal>>
        if (cacheIsDirty) {
            goals = getGoalsFromRemoteDataSource()
        } else {
            val latch = CountDownLatch(1)
            var disposable: Disposable? = null
            disposable = localDataSource.getSavingsGoals()
                .observeOn(schedulerProvider.io())
                .doFinally {
                    latch.countDown()
                    disposable?.dispose()
                }.subscribe({
                    goals = Observable.create { emitter -> emitter.onNext(it) }
                }, { goals = getGoalsFromRemoteDataSource() })
            latch.await()
        }
        return goals
    }
}
 

Как вы видите, я использую CountDownLatch.await(), чтобы убедиться, что результат отправлен в блоке подписки или ошибки. Есть ли лучшее решение, чем использовать CountDownLatch при использовании RxJava?

Комментарии:

1. Я не уверен, чего вы хотите достичь. Вы хотите подождать, пока не будет отправлен хотя бы один элемент (неважно, локальный или удаленный)?

2. Да, точно, когда результат выдается, в блоке finally я выполняю обратный отсчет ()

Ответ №1:

latch.await() блокирует поток, который как бы сводит на нет весь смысл использования асинхронного API, такого как RxJava.

RxJava имеет API onErrorResumeNext -интерфейсы для обработки исключений и toObservable преобразования Single результата в Observable результат.

Кроме того, подобные типы RxJava обычно должны быть холодными (они не запускаются и ничего не вычисляют, пока вы не подпишетесь), поэтому я бы рекомендовал не проверять cacheIsDirty до тех пор, пока не произойдет подписка.

Я бы выбрал что-то вроде:

     fun getSavingsGoals(): Observable<List<SavingsGoal>> {
        return Observable
            .fromCallable { cacheIsDirty }
            .flatMap {
                if (it) {
                    getGoalsFromRemoteDataSource()
                } else {
                    localDataSource.getSavingsGoals()
                        .toObservable()
                        .onErrorResumeNext(getGoalsFromRemoteDataSource())
                }
            }
    }
 

Кстати, если вы уже используете Kotlin, я настоятельно рекомендую сопрограммы. Затем ваш асинхронный код завершает чтение так же, как обычный последовательный код.