RxJava: каков рекомендуемый эмиттер для асинхронного сохранения и извлечения данных из NoSQL DB (т.Е. реактивный подход)

#mongodb #rx-java #rx-java2 #reactive #reactivex

#mongodb #rx-java #rx-java2 #реактивный #reactivex

Вопрос:

Контекст: я впервые работаю с RxJava.

При кодировании с помощью RxJava для (1) выбора документов из базы данных NoSQL и (2) вставки в NoSQL (например, MongoDB), какой рекомендуемый эмиттер учитывает реактивный стек?

Например, должен ли я обычно предпочитать использовать Flowable для чтения и Single для сохранения?

Этот код отлично работает для сохранения сообщения, полученного из темы Kafka, в MongoDB, но мне интересно, работает ли io.reactivex.Single — действительно лучший способ добиться этого.

 import com.mongodb.client.result.InsertOneResult
import com.mongodb.reactivestreams.client.MongoClient
import com.mongodb.reactivestreams.client.MongoCollection
import io.micronaut.configuration.kafka.annotation.KafkaKey
import io.micronaut.configuration.kafka.annotation.KafkaListener
import io.micronaut.configuration.kafka.annotation.OffsetReset
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.messaging.annotation.Body
import io.reactivex.Observable
import io.reactivex.Single
import javax.inject.Inject
import io.reactivex.functions.Function

@KafkaListener(offsetReset = OffsetReset.EARLIEST)
class DebitConsumer {

    @Inject
    lateinit var mongoClient: MongoClient

    @Topic("debit")
    fun receive(@KafkaKey key: String, name: String) {

        save(key.toInt(), name)

    }

    private fun save( id: Int?,name: String?) {
        val debitMessage =  DebitMessage(id, name)
        Single
                .fromPublisher(getCollection().insertOne(debitMessage))
                .map<DebitMessage>(Function<InsertOneResult, DebitMessage> { debitMessage })
                .subscribe()
    }

    private fun getCollection(): MongoCollection<DebitMessage?> {
        return mongoClient
                .getDatabase("mydb")
                .getCollection("mycollection", DebitMessage::class.java)
    }
}
 

Я пришел из Spring Data, который немного прямолинейен в реактивном мире, и, по не относящимся к делу причинам для этого вопроса, я не буду использовать Spring, и я ищу лучшие практики при записи / чтении данных в реактивном мире / без блокировки / противодавления.

Комментарии:

1. Я думаю, что вы не получаете никакой пользы от реактивного шаблона здесь из-за создания нового сингла при каждом вызове save . Вы стремитесь помещать каждый вызов сохранения в один поток?

2. @CarsonHolzheimer, спасибо. Ну, моя главная цель здесь — использовать неблокирующий способ сохранения данных, полученных от Kafka. Под «неблокирующим» я подразумеваю избегание больших ресурсов, таких как поток, остановленный до тех пор, пока он не будет завершен. Можете ли вы либо предложить, как бы вы достигли этого с помощью RxJava, либо указать причины, если я делаю что-то глупое? Что касается вашего вопроса, нет, я не стремлюсь помещать каждый вызов сохранения в один поток. Вы можете sse я подписываюсь немедленно.

3. В качестве простой аналогии, в прошлом я кодировал аналогичную идею с Spring Data Reactive с помощью mono … subscribe(), предполагая, что я спроектировал его так, чтобы потреблять меньше ресурсов (потоков). Надеюсь, я не сделал какой-нибудь глупости, потому что код уже продвигается. Итак, вы видите какое-то странное ожидание, пожалуйста, сообщите об этом

4. Я думаю, что ваш код в порядке. Просто он не использует какие-либо функции реактивных потоков, он просто действует как обратный вызов. Я вижу, что Mongo удалил свой асинхронный драйвер в пользу драйвера реактивных потоков, поэтому я думаю, что для получения асинхронного поведения вы должны использовать этот драйвер так, как у вас есть. Кстати, я совершенно уверен, что строка .map<DebitMessage>... ничего не делает.

5. Чтобы уточнить, он использует функцию драйвера реактивных потоков MongoDBs, который переносит операции записи в фоновый поток и, следовательно, обеспечивает неблокирующую запись. Однако на самом деле это не функция «реактивных потоков», а просто асинхронная запись в другой поток. Смотрите эту ссылку, почему я был в замешательстве: mongodb.github.io/mongo-java-driver-reactivestreams/1.13

Ответ №1:

Ваш код выглядит нормально. A Single имеет смысл сохранить, так как вы получите обратно только один результат. Flowable имеет смысл для чтения, но на самом деле выбор остается за вами и вашим приложением. Вы хотите прослушивать изменения базы данных через потоки изменений? Затем вам придется использовать Flowable , чтобы вы могли реагировать на несколько обновлений в потоке. Это может быть хорошей практикой для использования Flowable , даже если в настоящее время вы не прослушиваете несколько обновлений, но вы думаете, что сможете это сделать в будущем.

Если вы уверены, что хотите иметь дело только с 1 событием, используйте Single . Это сэкономит вам некоторые усилия в коде вашего приложения, связанные с возможностью нескольких событий.