Почему WebFlux не возвращает значение сразу после его создания

#spring #observable #reactive-programming #spring-webflux

#spring #наблюдаемый #реактивное программирование #spring-webflux

Вопрос:

Насколько я понимаю, когда Spring WebFlux должен возвращать поток значений, он будет печатать и отправлять значения сразу по мере их создания. Но я провел несколько тестов, и он возвращает все значения только тогда, когда все они созданы (пример кода приведен ниже). Я делаю что-то не так, или это то, как оно должно себя вести, и я неправильно понял концепцию reactive?

серверная часть:

 @SpringBootApplication
public class ReactiveResponseExperimentApplication {

    public static void main(String[] args) {
        SpringApplication.run(ReactiveResponseExperimentApplication.class, args);
    }

}


@RestController
@RequiredArgsConstructor
class ProductController {

    private final BobikService bobikService;

    @CrossOrigin
    @PostMapping("/bobiks")
    @ResponseStatus(HttpStatus.CREATED)
    public Mono<Bobik> createBobik(@RequestBody Bobik bobik) {
        return bobikService.save(bobik);
    }

    @CrossOrigin
    @GetMapping("/bobiks")
    public Flux<Bobik> getAllBobiks() {
        return bobikService.getAllBobiks();
    }

    @CrossOrigin
    @GetMapping("/bobiks/{id}")
    public Flux<Bobik> getBobikById(@PathVariable UUID id) {
        return bobikService.getBobikById(id);
    }
}

@Component
class BobikService {

    private HashMap<UUID, Bobik> dao = new HashMap<>();

    public Mono<Bobik> save(Bobik bobik) {
        bobik.setId(randomUUID());
        bobik.setStatus(Status.CREATED);
        dao.put(bobik.getId(), bobik);
        return Mono.just(bobik);
    }

    public Flux<Bobik> getAllBobiks() {
        return Flux.fromIterable(dao.values());
    }

    public Flux<Bobik> getBobikById(UUID id) {
        Bobik bobik = dao.get(id);
        if (bobik != null) {
            return Flux.create(fluxSink -> {
                boolean sendingInProgress = true;
                while(sendingInProgress) {
                    try {
                        Thread.sleep(4000);
                        if (bobik.getStatus() == Status.OPTIMISED) {
                            fluxSink.complete();
                            sendingInProgress = false;
                            System.out.println("---> sending completed");
                        } else {
                            bobik.setStatus(Status.values()[bobik.getStatus().ordinal()   1]);
                            System.out.println("---> send Bobik with status: "   bobik.getStatus());
                            fluxSink.next(bobik);
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                        fluxSink.error(e);
                    }
                }
            });
        }
        return Flux.empty();
    }
}

@Data
class Bobik {

    private UUID id;
    private String name;
    private Status status;

}

enum Status {

    CREATED,
    UPLOADED,
    CROPPED,
    OPTIMISED,

}

 

клиентская часть

     reactBobik() {
        console.debug(`--- going to react with bobik ${this.bobikId}`);
        this.http
            .get<Bobik>(`http://localhost:8080/bobiks/${this.bobikId}`)
            .subscribe(
                (nextBobik) => {
                    console.info(`--- next bobik is here with status ${nextBobik.status}`);
                    console.info(nextBobik);
                },
                (error) => {
                    console.error(`error occurred during fetching bobiks`, error);
                },
                () => {
                    console.info(`--- that's all, folks!`)
                }
            );
    }
 

журналы сервера

 ---> send Bobik with status: UPLOADED
---> send Bobik with status: CROPPED
---> send Bobik with status: OPTIMISED
---> sending completed
 

журналы клиента

 --- going to react with bobik 079b41eb-ebf3-4d7a-a520-1b9b4bfdae4c
--- next bobik is here with status undefined
Array(3) [ {…}, {…}, {…} ]
--- that's all, folks!
 

Комментарии:

1. Bobik bobik = dao.get(id); это блокирующий вызов, поэтому вы извлекаете все объекты из базы данных в блокирующем manor, а затем возвращаете их клиенту Array(3) [ {…}, {…}, {…} ] . Он будет отправлять элементы в потоке только в том случае, если вы используете неблокирующий драйвер базы данных, например JDBC, который, в свою очередь, создает поток значений из базы данных.