подписка после вставки данных в реактивные репозитории

#java #reactive-programming #spring-webflux #project-reactor #reactive

#java #реактивное программирование #spring-webflux #проект-реактор #реактивный

Вопрос:

Я полный новичок в реактивном мире, поэтому, просматривая некоторые примеры реактивных репозиториев, которые я нашел ниже:

 reactiverepository.save(employee).subscribe() // subscribing to make the publisher emit the data

reactiverepository.findAll() // no subscribe() ?
 

Я знаю, что для того, чтобы издатель (поток в приведенном выше случае) передавал данные, нам нужно subscribe() к нему. Но почему у нас нет таких же subscribe() вызовов в других методах CRUD, как в приведенном выше примере? Чего мне здесь не хватает?

Ответ №1:

реактор работает с концепцией producer и consumer . consumer subscribes Для producer и nothing will happen until you subscribe .

При использовании webflux , которое в основном spring-web , но реализовано с reactor концепцией, это the final consumer is usually the one subscribing так.

Вы можете думать об этом как о базе producer данных, а вызывающий client (веб-страница, приложение react, мобильное приложение) consumer — это, и когда клиент инициирует вызов серверной части, клиент subscribes к вашему server и server , в свою очередь subscribes , к database (помните? производитель, база данных создает данные).

Таким образом, ваш сервер в основном просто пересылает данные вызывающему клиенту, что означает, что обычно вы просто берете данные и возвращаете их клиенту, а платформа автоматически обрабатывает «подписку».

Обычно это означает, что вы НЕ должны подписываться в своем приложении, вместо этого вы должны выполнять цепочку вызовов и всегда сохранять цепочку неповрежденной, чтобы ваш сервер мог создавать элементы для клиента.

Итак, когда вы должны subscribe использовать свой сервер?

Ну, одной из причин может быть то, что у вас на сервере запущено задание, которое, возможно, извлекает данные с одного сервера и вставляет их в свою базу данных. Тогда ваш сервис является инициирующим клиентом, подписывающимся на другой сервер, другой сервер выдает вам элементы, вы получаете их и вставляете в свою базу данных.

но, например, в вашем примере:

 // first we fetch all, we filter, then we save, and we return back to the calling client keeping the chain intact, and no subscriptions
return reactiverepository.findAll()
    .filter(employes -> //some criteria)
    .flatMap(employee -> reactiverepository.save(employee));
 

Ответ №2:

Во-первых, вы должны знать о горячих и холодных издателях. Я бы посоветовал ознакомиться с этой статьей

Код, который вы пишете в мире реактивного программирования, представляет собой описание того, что вы хотите сделать (в мире реактора это называется сборкой, я бы посоветовал рассмотреть эту проблему, чтобы лучше понять разницу между сборкой и выполнением) после того, как данные будут готовы и не будут выполняться точно после выполнения вашего кода.

Второе, что следует иметь в виду, это то, что вы всегда находитесь в цепочке, и вам нужна только одна подписка в этой цепочке.

Например, предположим, что вы хотите получить все данные из репозитория и отфильтровать их с помощью одного поля, а затем вернуть их внешней стороне метода. Итак, вы должны сделать что-то вроде этого:

 return reactiveRepository.findAll().filter { //filter with one field } //or doing something different with your data when they fetched from DB
 

Вы должны заметить, что до тех пор, пока вы не подпишетесь на цепочку, данные не будут извлекаться из базы данных, и ваша цепочка ничего не будет делать.

Комментарии:

1. Спасибо!! В вашем примере filter() вы не вызвали subscribe() в цепочке, тогда фреймворк вызывает его? В моем вопросе я вызываю это явно

2. Я думаю, что лучший способ подписаться на реактивную цепочку — это когда вы хотите ответить клиенту. Пожалуйста, обратите внимание, что вам не нужно подписываться везде, где вы используете реактивную функцию (Mono или Flux). Вы можете продолжить цепочку и подготовить свой ответ, и завершить цепочку в конце, когда ответ будет готов, и помните, что подписка завершит цепочку, и после этого вы больше не в какой-либо цепочке.

3. Хорошо, по вашему мнению, если я удалю вызов subscribe() из save() выше, я ничего не получу в findAll()? что, если я сделаю «reactiverepository.findAll().subscribe ()», это сработает?

4. Вы могли бы что-то вроде этого: reactiveRepository.save(someObject).flatmap{reactiveRepository.findAll()}.subscribe() пожалуйста, учтите, что если вы оставите reactiveRepository.save() в покое и не будете использовать его в какой-либо другой цепочке, которая включает subscribe() в себя, это ничего не спасет.

5. Я просто попытался удалить subscribe() из save(), а также не добавил его ни в один из других методов crud. По-прежнему findAll() возвращает результат. Я думаю, что subscribe() вызывается внутри самой платформы.