Как получить сообщение mono после отправки другого сообщения

#java #project-reactor #reactor-netty

#java #проект-реактор #реактор-нетти

Вопрос:

Как отправить buf, а затем получить сообщение

Метод

 Mono<ByteBuf> send(ByteBuf buf){
    // how to send the buf then receive a msg
}
  

Я пытаюсь реализовать этот метод, отправив сообщение сообщения от исходящего соединения и получив сообщение сообщения от входящего, а затем вернув сообщение Mono. Но я могу получить сообщение только в методе then (Publisher). Похоже, он не может вернуться к моно данным

Я пробовал это.

 // the connecttion has been initialized before entering this method.

        Mono.just(buf)
                .doOnNext(data -> connection.outbound().sendObject(data).then().subscribe())
                .then(connection
                        .inbound()
                        .receiveObject()
                        .single()
                        .map(RpcDataPackage.class::cast)
                        .map(RpcDataPackage::getData)
                        .map(data -> {
                            try {
                                return resCodec.decode(data);
                            } catch (IOException e) {
                                throw new RpcRequestException(e);
                            }
                        })
                );
  

но оно будет блокироваться до истечения времени ожидания соединения

И я попробовал другой код. Я добавляю handle метод и помещаю ответ на карту. Тогда я могу получить Mono.fromSupply() с прерыванием цикла while map.get(key) != null .

Это заблокирует поток.

                 .handle(((nettyInbound, nettyOutbound) -> nettyInbound
                        .receiveObject()
                        .map(RpcDataPackage.class::cast)
                        .doOnNext(pkg -> {
                            String responseKey = "a key"

                            responseMap.put(responseKey, pkg);
                        })
                        .then()))
  

Ответ №1:

Вы не указываете, чего ожидаете. Смотрите Пример ниже, он отправляет некоторые данные, а затем получает то, что возвращает сервер.

     @Test
    public void test() {
        Connection connection =
                TcpClient.create()
                         .wiretap(true)
                         .host("example.com")
                         .port(80)
                         .connect()
                         .block();

        assertNotNull(connection);

        connection.outbound()
                  .sendObject(Unpooled.wrappedBuffer("test".getBytes()))
                  .then(connection.inbound()
                                  .receiveObject()
                                  .last()
                                  .doOnNext(System.out::println)
                                  .then())
                  .then()
                  .block();
    }
  

Комментарии:

1. Спасибо. Я пытаюсь реализовать свой метод, отправив сообщение msg из исходящего соединения и получив сообщение msg из входящего, а затем вернув сообщение Mono. Но я могу получить сообщение только в then(Publisher<Void>) методе. Похоже, он не может вернуться к моно-данным.

Ответ №2:

Я прочитал Mono javadoc и нашел MonoSink.

 Mono.create(monoSink -> {
  // some call
})
  

когда входящие получают ответ объекта, просто сделайте sink.success()

Ответ №3:

Вы должны использовать комбинацию NettyOutbound::then для прослушивания завершения записи и Mono::then для чтения вашего сообщения NettyInboud после записи.

   Mono<String> resposeMono = TcpClient.create()
            .connect()
            .flatMap(connection -> connection.outbound().sendString(Mono.just("Hello!"))
                    .then()
                    .then(connection.inbound().receive().aggregate().asString())
                    .doOnTerminate(connection::dispose));
  

Это приведет к записи «Привет!» на вывод, считыванию всех байтов из ввода в виде строки, а затем удалению соединения.