#java #mongodb #mongodb-java #reactive-streams #reactive-mongo-java
#java #mongodb #mongodb-java #реактивные потоки #реактивный-монго-java
Вопрос:
У меня есть куча Document
в a Collection
, и я хотел бы получить их все. Это моя ситуация:
- Я использую драйвер Java Reactive Streams
- Я использую
CodecRegistry
, чтобыDocument
десериализовать мойPojo
Проблема в том, что все варианты find()
метода возвращают a FindPublisher<Pojo>
и излишне говорить, что любой вид выделения значения приведет к возврату Pojo
объекта. Я хочу, чтобы a List<Pojo>
или a Set<Pojo>
возвращались. Как мне вернуть a List<Pojo
или a Set<Pojo>
?
В быстром запуске они используют find().first()
which возвращает один Document
и, следовательно Pojo
, имеет смысл один объект. Нет примера для возврата нескольких Document
.
Комментарии:
1. Для пояснения: можете ли вы показать, как вы получаете доступ к коллекции? Если я использую
database.getCollection("pojos", Pojo.class)
, то я могу использоватьList<Pojo> pojos = new ArrayList<>();
followed bycollection.find().forEach(a -> pojos.add(a));
. Конечно, это предполагает, что мой кодек настроен правильно. Но в моем подходе используетсяFindIterable
неFindPublisher
— так что, возможно, мне не хватает реактивной части головоломки.2. @andrewjames Я знаю об этом
FindIterable
подходе. Но я не могу найти примерFindPublisher
подхода. Если мне нужно будет использовать этотfindIterable
подход и адаптироваться кfindPublisher
нему, это будет что-то вродеSingle.fromPublisher(collection.find()).map(pojo -> list.add(pojo)....)
, а затемlist
каким-то образом вернуть. Я следую настройке доступа к коллекции согласно ссылке, указанной в моем вопросе, но с драйвером реактивных потоков.
Ответ №1:
Например, используя драйвер реактивных потоков MongoDB и RxJava:
Publisher<Document> publisher = collection.find();
List<Document> docs = Flowable.fromPublisher(publisher)
.blockingStream()
.collect(Collectors.toList());
[РЕДАКТИРОВАТЬ ДОБАВИТЬ]
Вы можете использовать неблокирующий вызов, например:
List<Document> docs = new ArrayList<>();
Observable.fromPublisher(publisher).subscribe(doc -> docs.add(doc));
Комментарии:
1. Прежде всего, я хочу, чтобы он оставался реактивным. Я не хочу блокировать. В этом весь смысл использования драйвера реактивных потоков. Во-вторых, нет смысла использовать конструкцию с учетом противодавления, например
Flowable
, когда вы собираетесь блокировать в любом случае. В-третьих,blockingStream()
Flowable
например, в RxJava 3 нет метода.2.(1) Вы можете использовать неблокирующие методы из API (2) Существует метод RxJava3 Flowable#blockingStream() (3) Вы можете использовать не наблюдаемое противодавление
3. 1. Пожалуйста, обратитесь к комментариям под вопросом, чтобы понять реальную проблему. Если бы я получалDocument
тогда, я смог бы получить объекты, которые мне нужны. Но я уже сопоставлен с aPojo
, и это то, что я получаю, а не aDocument
. 2. Я использую io.reactivex. Текучие , а не те, которые вы используете. 3. Также, пожалуйста, обратитесь к комментариям под вопросом. Это поможет вам получить лучшую картину.4. Вы можете использовать объект Pojo по вашему выбору вместо документа.
5. Мои
find()
возвратыFindPublisher<Pojo>
и нетFindPublisher<Document>
. Вот где у меня проблемы. Потому что даже acollection.find()
возвращает aFindPublisher<Pojo>
. Как я должен это интерпретировать? Кроме того, я возвращаю что-то вродеSingle.fromPublisher(coll.find())....
напрямую