#spring #observable #reactive-programming #spring-webflux
#spring #наблюдаемый #реактивное программирование #spring-webflux
Вопрос:
Насколько я понимаю, когда Spring WebFlux должен возвращать поток значений, он будет печатать и отправлять значения сразу по мере их создания. Но я провел несколько тестов, и он возвращает все значения только тогда, когда все они созданы (пример кода приведен ниже). Я делаю что-то не так, или это то, как оно должно себя вести, и я неправильно понял концепцию reactive?
серверная часть:
@SpringBootApplication
public class ReactiveResponseExperimentApplication {
public static void main(String[] args) {
SpringApplication.run(ReactiveResponseExperimentApplication.class, args);
}
}
@RestController
@RequiredArgsConstructor
class ProductController {
private final BobikService bobikService;
@CrossOrigin
@PostMapping("/bobiks")
@ResponseStatus(HttpStatus.CREATED)
public Mono<Bobik> createBobik(@RequestBody Bobik bobik) {
return bobikService.save(bobik);
}
@CrossOrigin
@GetMapping("/bobiks")
public Flux<Bobik> getAllBobiks() {
return bobikService.getAllBobiks();
}
@CrossOrigin
@GetMapping("/bobiks/{id}")
public Flux<Bobik> getBobikById(@PathVariable UUID id) {
return bobikService.getBobikById(id);
}
}
@Component
class BobikService {
private HashMap<UUID, Bobik> dao = new HashMap<>();
public Mono<Bobik> save(Bobik bobik) {
bobik.setId(randomUUID());
bobik.setStatus(Status.CREATED);
dao.put(bobik.getId(), bobik);
return Mono.just(bobik);
}
public Flux<Bobik> getAllBobiks() {
return Flux.fromIterable(dao.values());
}
public Flux<Bobik> getBobikById(UUID id) {
Bobik bobik = dao.get(id);
if (bobik != null) {
return Flux.create(fluxSink -> {
boolean sendingInProgress = true;
while(sendingInProgress) {
try {
Thread.sleep(4000);
if (bobik.getStatus() == Status.OPTIMISED) {
fluxSink.complete();
sendingInProgress = false;
System.out.println("---> sending completed");
} else {
bobik.setStatus(Status.values()[bobik.getStatus().ordinal() 1]);
System.out.println("---> send Bobik with status: " bobik.getStatus());
fluxSink.next(bobik);
}
} catch (InterruptedException e) {
e.printStackTrace();
fluxSink.error(e);
}
}
});
}
return Flux.empty();
}
}
@Data
class Bobik {
private UUID id;
private String name;
private Status status;
}
enum Status {
CREATED,
UPLOADED,
CROPPED,
OPTIMISED,
}
клиентская часть
reactBobik() {
console.debug(`--- going to react with bobik ${this.bobikId}`);
this.http
.get<Bobik>(`http://localhost:8080/bobiks/${this.bobikId}`)
.subscribe(
(nextBobik) => {
console.info(`--- next bobik is here with status ${nextBobik.status}`);
console.info(nextBobik);
},
(error) => {
console.error(`error occurred during fetching bobiks`, error);
},
() => {
console.info(`--- that's all, folks!`)
}
);
}
журналы сервера
---> send Bobik with status: UPLOADED
---> send Bobik with status: CROPPED
---> send Bobik with status: OPTIMISED
---> sending completed
журналы клиента
--- going to react with bobik 079b41eb-ebf3-4d7a-a520-1b9b4bfdae4c
--- next bobik is here with status undefined
Array(3) [ {…}, {…}, {…} ]
--- that's all, folks!
Комментарии:
1.
Bobik bobik = dao.get(id);
это блокирующий вызов, поэтому вы извлекаете все объекты из базы данных в блокирующем manor, а затем возвращаете их клиентуArray(3) [ {…}, {…}, {…} ]
. Он будет отправлять элементы в потоке только в том случае, если вы используете неблокирующий драйвер базы данных, например JDBC, который, в свою очередь, создает поток значений из базы данных.