Подключение к очереди сообщений сервера с помощью сети реактора

#java #httpclient #project-reactor #reactor-netty

#java #httpclient #проект-реактор #сеть реактора

Вопрос:

Я пытаюсь подключиться к очереди сообщений, запущенной в контейнере docker, используя сеть реактора. Я делаю это как автономный, не используя SpringFlux из-за проблем с зависимостями.

Из примеров в документации по сети реактора я увидел, что есть способ подключиться к серверу и получить ответ:

 public static void main(String[] args) {
        String response =
                HttpClient.create()
                          .headers(h -> h.add("my header", my_header)
                          .get()
                          .uri(my_uri)
                          .responseContent() 
                          .aggregate()       
                          .asString()        
                          .block();
    }
  

но когда я пытаюсь впоследствии отобразить вывод через System.out.println(), ничего не происходит.

Я также пытался понять, как использовать:

Flux<V> response(BiFunction<HttpClientResponse,ByteBufFlux,Publisher<V>> receiver)

Но я не уверен точно, что делать. Я видел в документации, что есть класс с именем Connection, который использует TcpClient и имеет метод subscribe.

Я немного растерялся, можете ли вы указать мне правильное направление реализации этого в сети реактора без использования spring-flux?

Спасибо

Редактировать:

После некоторых экспериментов я получил это:

 private Disposable subscribe() {
    return HttpClient.create()
               .headers(h -> h.add("my header", my_header)
               .get()
               .uri(my_uri)
               .response((res, bytes) - > {
                   System.out.println(bytes.asString());
                   return bytes.asString();})
               .subscribe();
}
  

Это дает мне FluxHandle, как я могу использовать это, чтобы фактически прочитать тело ответа?

Комментарии:

1. Пример с block должен сработать, если это не так, пожалуйста, откройте проблему. Также вы можете отслеживать сетевой трафик с добавлением .wiretap(true) .

Ответ №1:

Итак, я выяснил, как подписаться и прочитать данные, полученные с сервера, и даже преобразовать данные в JSON, используя jackson библиотеку, чтобы их было легче читать моему коду.

 private Disposable subscribe() {
    return HttpClient.create()
               .headers(h -> h.add("my header", my_header)
               .get()
               .uri(my_uri)
               .response((resp, bytes) -> {
                    return bytes.asString();
                })
                .subscribe(response -> {
                    try {
                        consumeData(new ObjectMapper()
                                .readValue(response, MyData.class));
                    } catch (IOException ex) {
                        System.out.println("ERROR converting to json: "   ex);
                    }
                    });
}
  

кажется, что при использовании метода subscribe() я могу прослушивать входящие ответы и что-то с ними делать. Мне все еще нужно добавить способ закрытия соединения при остановке сервера или завершении работы очереди сообщений, чтобы клиент не зависал в несуществующей очереди сообщений.