#java #httpclient #project-reactor #reactor-netty
#java #httpclient #проект-реактор #сеть реактора
Вопрос:
Я пытаюсь подключиться к очереди сообщений, запущенной в контейнере docker, используя сеть реактора. Я делаю это как автономный, не используя SpringFlux из-за проблем с зависимостями.
Из примеров в документации по сети реактора я увидел, что есть способ подключиться к серверу и получить ответ:
public static void main(String[] args) {
String response =
HttpClient.create()
.headers(h -> h.add("my header", my_header)
.get()
.uri(my_uri)
.responseContent()
.aggregate()
.asString()
.block();
}
но когда я пытаюсь впоследствии отобразить вывод через System.out.println(), ничего не происходит.
Я также пытался понять, как использовать:
Flux<V> response(BiFunction<HttpClientResponse,ByteBufFlux,Publisher<V>> receiver)
Но я не уверен точно, что делать. Я видел в документации, что есть класс с именем Connection, который использует TcpClient и имеет метод subscribe.
Я немного растерялся, можете ли вы указать мне правильное направление реализации этого в сети реактора без использования spring-flux?
Спасибо
Редактировать:
После некоторых экспериментов я получил это:
private Disposable subscribe() {
return HttpClient.create()
.headers(h -> h.add("my header", my_header)
.get()
.uri(my_uri)
.response((res, bytes) - > {
System.out.println(bytes.asString());
return bytes.asString();})
.subscribe();
}
Это дает мне FluxHandle, как я могу использовать это, чтобы фактически прочитать тело ответа?
Комментарии:
1. Пример с
block
должен сработать, если это не так, пожалуйста, откройте проблему. Также вы можете отслеживать сетевой трафик с добавлением.wiretap(true)
.
Ответ №1:
Итак, я выяснил, как подписаться и прочитать данные, полученные с сервера, и даже преобразовать данные в JSON, используя jackson
библиотеку, чтобы их было легче читать моему коду.
private Disposable subscribe() {
return HttpClient.create()
.headers(h -> h.add("my header", my_header)
.get()
.uri(my_uri)
.response((resp, bytes) -> {
return bytes.asString();
})
.subscribe(response -> {
try {
consumeData(new ObjectMapper()
.readValue(response, MyData.class));
} catch (IOException ex) {
System.out.println("ERROR converting to json: " ex);
}
});
}
кажется, что при использовании метода subscribe() я могу прослушивать входящие ответы и что-то с ними делать. Мне все еще нужно добавить способ закрытия соединения при остановке сервера или завершении работы очереди сообщений, чтобы клиент не зависал в несуществующей очереди сообщений.