Сброс входного потока источника конечной точки весеннего отдыха

#java #spring #stream

Вопрос:

у меня есть приложение, которое отображает содержимое потока. Этот поток обеспечивается за счет весеннего отдыха. Содержимое потока нечасто передается из другого потока.

Это уже работает, но единственное, с чем у меня возникают проблемы, — это «очистка» потока. В настоящее время я вижу контент только тогда, когда поток закрыт, но я хочу отображать его в режиме реального времени.

Я проверил это, вызвав конечную точку через chrome/firefox, и (полное) содержимое появляется только после закрытия потока (страница больше не загружается). Также в моем приложении контент отображается только в конце, а не в то же время.

 @GetMapping("/session")
public ResponseEntity<InputStreamResource> openSession(){
    var inLog = new PipedInputStream();
    var logWritable = new PipedOutputStream(inLog);

    spawnThreadAndPushMessages(logWritable);

    return ResponseEntity.ok()
        .contentType(MediaType.TEXT_PLAIN)
        .body(new InputStreamResource(inLog));
}
 

Я пишу в поток Pipedoutput (доступный для записи в журнал) следующим образом:

 spawnThreadAndPushMessages(OutputStream logWritable){
    //thread stuff etc omitted for clarity
    logWritable.write("test".getBytes());
    logWritable.flush();
}
 

Что здесь происходит не так?

Ответ №1:

так что я, наконец, понял это:

 .contentType(MediaType.TEXT_EVENT_STREAM)
 

Мне пришлось переосмыслить свою архитектуру. Теперь я пишу в параллельный список в другом потоке. Моя конечная точка теперь выглядит так:

 @GetMapping("/session")
public ResponseEntity<StreamingResponseBody> openSession(){
var shutdown = new AtomicBoolean(false);
var tobeFlushed = new ConcurrentLinkedQueue<String>();
//thread management etc
createThreadAndWriteToList(tobeFlushed, shutdown);

return ResponseEntity.ok()
    .contentType(MediaType.TEXT_EVENT_STREAM)
    .body(outputStream -> {
      while (!shutdown.get()) {
        tobeFlushed.forEach(x -> {
          try {
            outputStream.write((x "n").getBytes(StandardCharsets.UTF_8));
            outputStream.flush();
          } catch (IOException e) {
            e.printStackTrace();
          }
        });
        tobeFlushed.clear();
    }
    System.out.println("Finished request!! ------------------------");
  });
}