#java #project-reactor #reactor-netty
#java #проект-реактор #реактор-нетти
Вопрос:
Как отправить buf, а затем получить сообщение
Метод
Mono<ByteBuf> send(ByteBuf buf){
// how to send the buf then receive a msg
}
Я пытаюсь реализовать этот метод, отправив сообщение сообщения от исходящего соединения и получив сообщение сообщения от входящего, а затем вернув сообщение Mono. Но я могу получить сообщение только в методе then (Publisher). Похоже, он не может вернуться к моно данным
Я пробовал это.
// the connecttion has been initialized before entering this method.
Mono.just(buf)
.doOnNext(data -> connection.outbound().sendObject(data).then().subscribe())
.then(connection
.inbound()
.receiveObject()
.single()
.map(RpcDataPackage.class::cast)
.map(RpcDataPackage::getData)
.map(data -> {
try {
return resCodec.decode(data);
} catch (IOException e) {
throw new RpcRequestException(e);
}
})
);
но оно будет блокироваться до истечения времени ожидания соединения
И я попробовал другой код. Я добавляю handle
метод и помещаю ответ на карту. Тогда я могу получить Mono.fromSupply()
с прерыванием цикла while map.get(key) != null
.
Это заблокирует поток.
.handle(((nettyInbound, nettyOutbound) -> nettyInbound
.receiveObject()
.map(RpcDataPackage.class::cast)
.doOnNext(pkg -> {
String responseKey = "a key"
responseMap.put(responseKey, pkg);
})
.then()))
Ответ №1:
Вы не указываете, чего ожидаете. Смотрите Пример ниже, он отправляет некоторые данные, а затем получает то, что возвращает сервер.
@Test
public void test() {
Connection connection =
TcpClient.create()
.wiretap(true)
.host("example.com")
.port(80)
.connect()
.block();
assertNotNull(connection);
connection.outbound()
.sendObject(Unpooled.wrappedBuffer("test".getBytes()))
.then(connection.inbound()
.receiveObject()
.last()
.doOnNext(System.out::println)
.then())
.then()
.block();
}
Комментарии:
1. Спасибо. Я пытаюсь реализовать свой метод, отправив сообщение msg из исходящего соединения и получив сообщение msg из входящего, а затем вернув сообщение Mono. Но я могу получить сообщение только в
then(Publisher<Void>)
методе. Похоже, он не может вернуться к моно-данным.
Ответ №2:
Я прочитал Mono javadoc и нашел MonoSink.
Mono.create(monoSink -> {
// some call
})
когда входящие получают ответ объекта, просто сделайте sink.success()
Ответ №3:
Вы должны использовать комбинацию NettyOutbound::then для прослушивания завершения записи и Mono::then для чтения вашего сообщения NettyInboud после записи.
Mono<String> resposeMono = TcpClient.create()
.connect()
.flatMap(connection -> connection.outbound().sendString(Mono.just("Hello!"))
.then()
.then(connection.inbound().receive().aggregate().asString())
.doOnTerminate(connection::dispose));
Это приведет к записи «Привет!» на вывод, считыванию всех байтов из ввода в виде строки, а затем удалению соединения.