Как может быть реализован аналог onErrorResumeNext для RxJava2 в потоке Kotlin, который возвращает другой поток?

#android #rx-java2 #offline-caching #kotlin-flow

#Android #rx-java2 #автономное кэширование #kotlin-flow

Вопрос:

Я хочу реализовать автономный подход с Flow , сначала попытайтесь извлечь данные из удаленного источника, если это не удается, например, модифицируйте сетевое исключение, я хочу извлечь данные из локального источника с помощью приведенного ниже кода

     return flow { emit(repository.fetchEntitiesFromRemote()) }
        .map {

            println("🍏 getPostFlowOfflineLast() First map in thread: ${Thread.currentThread().name}")

            val data = if (it.isEmpty()) {
                repository.getPostEntitiesFromLocal()
            } else {
                repository.deletePostEntities()
                repository.savePostEntity(it)
                repository.getPostEntitiesFromLocal()
            }

            entityToPostMapper.map(data)
        }
        .catch { cause ->
            println("❌ getPostFlowOfflineLast() FIRST catch with error: $cause, in thread: ${Thread.currentThread().name}")
           flow { emit(repository.getPostEntitiesFromLocal()) }
        }
        .map { postList ->

            println("🎃 getPostFlowOfflineLast() Second map in thread: ${Thread.currentThread().name}")

            ViewState<List<Post>>(
                status = Status.SUCCESS,
                data = postList
            )
        }
        .catch { cause: Throwable ->

            println("❌ getPostFlowOfflineLast() SECOND catch with error: $cause, in thread: ${Thread.currentThread().name}")

            flow {
                emit(
                    ViewState<List<Post>>(
                        Status.ERROR,
                        error = cause
                    )
                )
            }
        }
  

Но он застревает с исключением

 I: ❌ getPostFlowOfflineLast() FIRST catch with error: java.net.UnknownHostException: Unable to resolve host "jsonplaceholder.typicode.com": No address associated with hostname, in thread: main
  

Какой должна быть правильная реализация, чтобы иметь какую-либо наблюдаемость, подобную RxJava onResumeNext , если функция репозитория была наблюдаемой?

 onErrorResumeNext { _: Throwable ->
     Observable.just(repository.getPostEntitiesFromLocal())
}
  

Ответ №1:

Выяснил, что я могу использовать emitAll с потоком для продолжения потока даже несколько раз.

     .catch { cause ->
        println("❌ getPostFlowOfflineLast() FIRST catch with error: $cause, in thread: ${Thread.currentThread().name}")
        emitAll(flow { emit(repository.getPostEntitiesFromLocal()) })
    }
    .map {
        if (!it.isNullOrEmpty()) {
            entityToPostMapper.map(it)
        } else {
            throw EmptyDataException("No data is available!")
        }
    }
    .map { postList ->
        println("🎃 getPostFlowOfflineLast() Third map in thread: ${Thread.currentThread().name}")
        ViewState(status = Status.SUCCESS, data = postList)
    }
    .catch { cause: Throwable ->
        println("❌ getPostFlowOfflineLast() SECOND catch with error: $cause, in thread: ${Thread.currentThread().name}")
        emitAll(flow { emit(ViewState(Status.ERROR, error = cause)) })
    }