#java #asynchronous #vert.x #completable-future #vertx-eventbus
#java #асинхронный #vert.x #завершаемый-будущее #vertx-eventbus
Вопрос:
Я запускаю локальный экземпляр vertx. Маршрутизатор перенаправляет мои запросы на рабочую вершину, которая имеет следующий обработчик:
protected Handler<Message<JsonObject>> handler1() {
return msg -> {
final RequestBody req = mapper.readValue(msg.body().encode(), RequestBody.class);
processRequest(req, msg);
}
}
Функция ProcessRequest принимает тело запроса, выполняет вызовы двух внешних служб, объединяет ответы и возвращает обратно клиенту.
private processRequest(RequestBody req, Message<JsonObject> msg) {
CompletableFuture<String> service1Response = getService1ResponseAsync(req); // Make async call to service 1
String service2Response = getService2ResponseSync(req); // Make sync call to service 2
ResponseBody response = aggregateResult(service1Response.join(), service2Response); // Tag1
msg.reply(mapper.writeValueAsString(response));
}
private CompletableFuture<String> getService1ResponseAsync(RequestBody req) {
CompletableFuture<String> result = new CompletableFuture();
// Below handler call makes GET call to service 1 using apache HTTP client and returns the response
vertx.eventBus().request("serviceVerticleAddr1", mapper.writeValueAsString(req), new DeliveryOptions(), reply -> { // Tag2
if (reply.succeeded())
result.complete(reply.result().body().toString());
else
result.completeExceptionally(result.cause());
}
}
Когда я нажимаю на вышеуказанный API, время ожидания моего запроса истекает. Поток из рабочего пула, назначенный для выполнения моего запроса, блокируется в Tag1 навсегда. При дальнейшей отладке я обнаружил, что обработчик ответа для вызова в Tag2 не вызывается.
Обработчики в service verticle (serviceVerticleAddr1) [т.Е. Tag2] возвращают правильный ответ для других API, использующих его, но для меня он блокируется. Может кто-нибудь, пожалуйста, помочь мне определить причину? Возникает ли какая-то тупиковая ситуация, когда поток, вызывающий vertx.EventBus().request [Tag2], начинает ожидать будущего завершения в service1Response.join() [Tag1] ?
Комментарии:
1. Скорее всего, выполнение зависает
serviceVerticleAddr
. Можете ли вы выполнить отладкуserviceVerticleAddr1
, чтобы убедиться, что она успешно завершена?
Ответ №1:
Я думаю, что заблокирован из-за того, что отправитель не получает уведомления от потребителя о том, что сообщение было обработано. Я бы рекомендовал вам проверить внутри блока обработчика, зарегистрированного для получателя serviceVerticleAddr1
адреса, и убедиться, что он отвечает (уведомляет) отправителя о том, что запрошенное сообщение успешно обработано (или нет). Потребитель может выглядеть так
vertx.eventBus().consumer("serviceVerticleAddr1", message -> {
try{
doSomething();
message.reply("done");
} catch(){
message.fail(0, "fails");
}
});
Таким образом, асинхронный обработчик отправителя будет уведомлен о том, что потребитель может обработать запрошенное сообщение