Обработчик ответа шины событий Vertx не вызывается для определенного вызова

#java #asynchronous #vert.x #completable-future #vertx-eventbus

#java #асинхронный #vert.x #завершаемый-будущее #vertx-eventbus

Вопрос:

Я запускаю локальный экземпляр vertx. Маршрутизатор перенаправляет мои запросы на рабочую вершину, которая имеет следующий обработчик:

 protected Handler<Message<JsonObject>> handler1() {
    return msg -> {
        final RequestBody req = mapper.readValue(msg.body().encode(), RequestBody.class);
        processRequest(req, msg);
    }
}
 

Функция ProcessRequest принимает тело запроса, выполняет вызовы двух внешних служб, объединяет ответы и возвращает обратно клиенту.

 private processRequest(RequestBody req, Message<JsonObject> msg) {
    CompletableFuture<String> service1Response = getService1ResponseAsync(req); // Make async call to service 1
    String service2Response = getService2ResponseSync(req); // Make sync call to service 2

    ResponseBody response = aggregateResult(service1Response.join(), service2Response);  // Tag1
    msg.reply(mapper.writeValueAsString(response));
}

private CompletableFuture<String> getService1ResponseAsync(RequestBody req) {
    CompletableFuture<String> result = new CompletableFuture();
    // Below handler call makes GET call to service 1 using apache HTTP client and returns the response
    vertx.eventBus().request("serviceVerticleAddr1", mapper.writeValueAsString(req), new DeliveryOptions(), reply -> { // Tag2
        if (reply.succeeded())
            result.complete(reply.result().body().toString());
        else
            result.completeExceptionally(result.cause());
    }
}

 

Когда я нажимаю на вышеуказанный API, время ожидания моего запроса истекает. Поток из рабочего пула, назначенный для выполнения моего запроса, блокируется в Tag1 навсегда. При дальнейшей отладке я обнаружил, что обработчик ответа для вызова в Tag2 не вызывается.

Обработчики в service verticle (serviceVerticleAddr1) [т.Е. Tag2] возвращают правильный ответ для других API, использующих его, но для меня он блокируется. Может кто-нибудь, пожалуйста, помочь мне определить причину? Возникает ли какая-то тупиковая ситуация, когда поток, вызывающий vertx.EventBus().request [Tag2], начинает ожидать будущего завершения в service1Response.join() [Tag1] ?

Комментарии:

1. Скорее всего, выполнение зависает serviceVerticleAddr . Можете ли вы выполнить отладку serviceVerticleAddr1 , чтобы убедиться, что она успешно завершена?

Ответ №1:

Я думаю, что заблокирован из-за того, что отправитель не получает уведомления от потребителя о том, что сообщение было обработано. Я бы рекомендовал вам проверить внутри блока обработчика, зарегистрированного для получателя serviceVerticleAddr1 адреса, и убедиться, что он отвечает (уведомляет) отправителя о том, что запрошенное сообщение успешно обработано (или нет). Потребитель может выглядеть так

 vertx.eventBus().consumer("serviceVerticleAddr1", message -> {
  try{
     doSomething();
     message.reply("done");
  } catch(){
     message.fail(0, "fails");
  }
});
 

Таким образом, асинхронный обработчик отправителя будет уведомлен о том, что потребитель может обработать запрошенное сообщение