#mongodb #rx-java #rx-java2 #reactive #reactivex
#mongodb #rx-java #rx-java2 #реактивный #reactivex
Вопрос:
Контекст: я впервые работаю с RxJava.
При кодировании с помощью RxJava для (1) выбора документов из базы данных NoSQL и (2) вставки в NoSQL (например, MongoDB), какой рекомендуемый эмиттер учитывает реактивный стек?
Например, должен ли я обычно предпочитать использовать Flowable для чтения и Single для сохранения?
Этот код отлично работает для сохранения сообщения, полученного из темы Kafka, в MongoDB, но мне интересно, работает ли io.reactivex.Single — действительно лучший способ добиться этого.
import com.mongodb.client.result.InsertOneResult
import com.mongodb.reactivestreams.client.MongoClient
import com.mongodb.reactivestreams.client.MongoCollection
import io.micronaut.configuration.kafka.annotation.KafkaKey
import io.micronaut.configuration.kafka.annotation.KafkaListener
import io.micronaut.configuration.kafka.annotation.OffsetReset
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.messaging.annotation.Body
import io.reactivex.Observable
import io.reactivex.Single
import javax.inject.Inject
import io.reactivex.functions.Function
@KafkaListener(offsetReset = OffsetReset.EARLIEST)
class DebitConsumer {
@Inject
lateinit var mongoClient: MongoClient
@Topic("debit")
fun receive(@KafkaKey key: String, name: String) {
save(key.toInt(), name)
}
private fun save( id: Int?,name: String?) {
val debitMessage = DebitMessage(id, name)
Single
.fromPublisher(getCollection().insertOne(debitMessage))
.map<DebitMessage>(Function<InsertOneResult, DebitMessage> { debitMessage })
.subscribe()
}
private fun getCollection(): MongoCollection<DebitMessage?> {
return mongoClient
.getDatabase("mydb")
.getCollection("mycollection", DebitMessage::class.java)
}
}
Я пришел из Spring Data, который немного прямолинейен в реактивном мире, и, по не относящимся к делу причинам для этого вопроса, я не буду использовать Spring, и я ищу лучшие практики при записи / чтении данных в реактивном мире / без блокировки / противодавления.
Комментарии:
1. Я думаю, что вы не получаете никакой пользы от реактивного шаблона здесь из-за создания нового сингла при каждом вызове
save
. Вы стремитесь помещать каждый вызов сохранения в один поток?2. @CarsonHolzheimer, спасибо. Ну, моя главная цель здесь — использовать неблокирующий способ сохранения данных, полученных от Kafka. Под «неблокирующим» я подразумеваю избегание больших ресурсов, таких как поток, остановленный до тех пор, пока он не будет завершен. Можете ли вы либо предложить, как бы вы достигли этого с помощью RxJava, либо указать причины, если я делаю что-то глупое? Что касается вашего вопроса, нет, я не стремлюсь помещать каждый вызов сохранения в один поток. Вы можете sse я подписываюсь немедленно.
3. В качестве простой аналогии, в прошлом я кодировал аналогичную идею с Spring Data Reactive с помощью mono … subscribe(), предполагая, что я спроектировал его так, чтобы потреблять меньше ресурсов (потоков). Надеюсь, я не сделал какой-нибудь глупости, потому что код уже продвигается. Итак, вы видите какое-то странное ожидание, пожалуйста, сообщите об этом
4. Я думаю, что ваш код в порядке. Просто он не использует какие-либо функции реактивных потоков, он просто действует как обратный вызов. Я вижу, что Mongo удалил свой асинхронный драйвер в пользу драйвера реактивных потоков, поэтому я думаю, что для получения асинхронного поведения вы должны использовать этот драйвер так, как у вас есть. Кстати, я совершенно уверен, что строка
.map<DebitMessage>...
ничего не делает.5. Чтобы уточнить, он использует функцию драйвера реактивных потоков MongoDBs, который переносит операции записи в фоновый поток и, следовательно, обеспечивает неблокирующую запись. Однако на самом деле это не функция «реактивных потоков», а просто асинхронная запись в другой поток. Смотрите эту ссылку, почему я был в замешательстве: mongodb.github.io/mongo-java-driver-reactivestreams/1.13
Ответ №1:
Ваш код выглядит нормально. A Single
имеет смысл сохранить, так как вы получите обратно только один результат. Flowable
имеет смысл для чтения, но на самом деле выбор остается за вами и вашим приложением. Вы хотите прослушивать изменения базы данных через потоки изменений? Затем вам придется использовать Flowable
, чтобы вы могли реагировать на несколько обновлений в потоке. Это может быть хорошей практикой для использования Flowable
, даже если в настоящее время вы не прослушиваете несколько обновлений, но вы думаете, что сможете это сделать в будущем.
Если вы уверены, что хотите иметь дело только с 1 событием, используйте Single
. Это сэкономит вам некоторые усилия в коде вашего приложения, связанные с возможностью нескольких событий.