Множественные запросы POST с помощью RxJava

#kotlin #post #retrofit #rx-java #concat

#kotlin #Публикация #модернизация #rx-java #конкатенация

Вопрос:

У меня есть список книг.

 open class Book(
    @PrimaryKey
    var id: String? = null,
    var title: String? = null,
    var author: String? = null
): RealmObject()
  

Во время цикла for я фильтрую некоторые книги и создаю Observable из отфильтрованных. Я добавляю каждый наблюдаемый объект в массив.

 val listInserts = ArrayList<Observable<Book>>()
for loop(..) {
  if (localBook condition) {
    val postObservable = networkApiAdapter.insert(localBook)
    listInserts.add(postObservable)
  }
}
  

Я объединяю (или объединяю) наблюдаемые в надежде на некоторые последовательные POST-запросы.

         Observable.concat(listInserts)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(
                { s ->
                    println(s)
                },
                { err ->
                    println(err)
                },
                { println("onComplete") }
            )
  

На мой сервер MongoDB Flask всегда поступает только один POST-запрос, и я также получаю эту ошибку:

 com.google.gson.JsonSyntaxException: java.lang.IllegalStateException: Expected BEGIN_OBJECT but was NUMBER at line 1 column 4 path $
  

Функции сетевого адаптера, модернизация:

 class NetworkAPIAdapter private constructor() {
    fun insert(dto: Book): Observable<Book> {
        println(dto.toString())
        return bookService.insert(dto.title!!, dto.author!!)
    }

    interface BooksService {
        @FormUrlEncoded
        @POST(URL_ORDERS_ALL)
        fun insert(
            @Field("title") title: String,
            @Field("author") author: String
        ): Observable<Book>
    }
}
  

Любая помощь будет приветствоваться. Я не знаю, как выполнять множественные запросы. Я перепробовал много решений с zip, repeatUntil, flatMap, но ни одно из них не сработало.

После решения этой проблемы я должен удалить все локальные книги и выполнить запрос GET. Все каким-то образом работает с использованием RxJava.

Ответ №1:

Ваш BooksService.insert определяется как POST запрос, который ожидает ответа от сервера в виде Book строки JSON (т.е. Observable<Book> ). Я думаю, что это не то, что ваш сервер возвращает после того, как POST запрос проходит. Проверьте, что возвращает ваш POST запрос, я думаю, что он возвращает что-то Int (вероятно, статус ответа) вместо Book , что объясняет вашу ошибку:

 com.google.gson.JsonSyntaxException: java.lang.IllegalStateException: Expected BEGIN_OBJECT but was NUMBER at line 1 column 4 path $
  

Если это так, то ваш объединенный Observable завершится, как только будет выдана ошибка, т. е. сразу после вашего первого POST запроса, поэтому остальные запросы не будут выполнены.

Если мои предположения были верны, измените свой сервис на:

 interface BooksService {
        @FormUrlEncoded
        @POST(URL_ORDERS_ALL)
        fun insert(
            @Field("title") title: String,
            @Field("author") author: String
        ): Observable<Int>
    }
  

Кроме того, если вы хотите, чтобы другие ваши запросы выполнялись, даже если предыдущий выдает ошибку, выполните:

 val postObservable = networkApiAdapter.insert(localBook).onErrorReturn(_ -> SOME_INT_FLAG)
  

Обновить

Чтобы выполнить требуемую логику после отправки всех POST запросов, попробуйте:

     Observable.concat(listInserts)
        // complete this observable after all POST requests have been sent
        // not that it does not necesarily mean that all responses were 200, you should implement yourself what happens to items that were not successfully `POST`ed
        .take(listInserts.size)
        .doOnComplete { 
            // called when this observable completes, i.e. when all POST requests have been sent
            executeFinalActions()
        }
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(
            { s->
                println(s)
            },
            { err ->
                println(err)
            },
            { println("onComplete") }
        )
  

И реализовать executeFinalActions() функцию:

 private fun executeFinalActions() {
    realm.executeTransaction(Realm::deleteAll)
    networkApiAdapter.fetchAll()
    .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(
            { books ->
                // todo: store books
            },
            { err ->
                println(err)
            }
        )
}
  

PS: возможно, вы захотите взглянуть, что Observable.take() делает: http://reactivex.io/documentation/operators/take.html

Комментарии:

1. Вы также знаете, как вызвать эту функцию после того, как все сообщения вернут 200? realm.executeTransaction(Realm::deleteAll) и после этого этот запрос GET, который возвращает наблюдаемый<Список<Книга>>? networkApiAdapter.fetchAll() Итак, последовательный поток: POSTs, deleteAll, fetchAll.

2. Спасибо! 🙏 Вы сэкономили мне много времени.