Как выполнить операцию на основе результата нескольких завершаемых результатов RxJava

#java #kotlin #rx-java #rx-kotlin

#java #kotlin #rx-java #rx-kotlin

Вопрос:

Некоторое время я ломал голову над этим, я теряюсь здесь в управлении требованием, в котором я должен использовать Rx в Kotlin.

Позвольте мне объяснить.

Существует набор идентификаторов, эквивалентные элементы которых необходимо удалить с сервера и, в конечном итоге, в локальном режиме на основе успеха сервера.

В основном

  1. Выполнить сетевой вызов для удаления для одного id (поддерживаемый сетевой вызов возвращает Completable )
  2. если complete получен (успешный) обратный вызов, сохраните id в list (памяти)
  3. Выполните первый и второй шаги для всех id , чтобы удалить
  4. После завершения каждого сетевого вызова передайте список для удаления из локальной базы данных

Итак, доступны эти функции, которые нельзя изменить.

  1. fun deleteId(id: String): Completable { networkCall.deleteId(id) }
  2. fun deleteIds(ids: List<String>): Unit { localDb.deleteId(ids) }

Это то, что я пробовал, но, очевидно, неполный и застрял…

 val deleted = CopyOnWriteArrayList<String>()
val error = CopyOnWriteArrayList<String>()
items?.filter { it.isChecked }
    ?.map { Pair(it.id, dataManager.deleteId(it.id)) }
    ?.forEach { (Id, deleteOp) ->
        deleteOp.subscribeOn(Schedulers.io())
                .subscribe(object: CompletableObserver {
                    override fun onComplete() { deleted.add(Id) }

                    override fun onSubscribe(d: Disposable) { disposableManager  = d }

                    override fun onError(e: Throwable) { error.add(Id) }

                })
    }
  

Итак, теперь здесь есть несколько проблем, одна из них — это требование, при котором я не могу найти место, где можно узнать, что все запросы завершены, чтобы инициировать удаление LocalDB.

Есть ли способ, которым я могу использовать Flowable.fromIterable() or zip или merge каким-либо образом следуя цепочке команд, подобной приведенной выше, для достижения вышеуказанного сценария?

Ответ №1:

Если я правильно понял ваш вариант использования, то это должно сделать:

 // ids of items to delete, for illustration lets have some temp set
val ids = setOf<String>("1", "2", "3", "4")
val deleteIdSingles = mutableListOf<Single<String>>()
ids.forEach { id ->
    deleteIdSingles.add(
        api.deleteId(id)
            // when request successfully completes, return its id wrapped in a Single, instead of Completable
            .toSingle<String> { id }
            // return a flag when this request fails, so that the stream is not closed and other requests would still be executed
            .onErrorReturn { "FAILED" }
    )
}

Single.merge(deleteIdSingles)
    // collect the results of the singles (i.e. the ids of successful deletes), and emit a set of those ids once all the singles has completed
    .collect(
        { mutableListOf() },
        { deletedIds: MutableList<String>, id: String -> if (id != "FAILED") deletedIds.add(id) }
    )
    .observeOn(Schedulers.io())
    .subscribe(
        { deletedIds ->
                db.deleteIds(deletedIds)
        }, { error ->
            // todo: onError
        })