Как отправлять входящие обработчики NetSocket в разные потоки цикла событий?

#vert.x

#vert.x

Вопрос:

Я пытаюсь использовать Vertx для реализации TCP-сервера, принимающего входящие соединения, а затем обрабатывающего разные сокеты. Поскольку каждый сокет может обрабатываться независимо, обработчики, принадлежащие разным сокетам, должны выполняться в разных потоках цикла событий одновременно.

Согласно документу Vert.x,

Стандартным вершинам назначается поток цикла событий при их создании, и с этим циклом событий вызывается метод start. Когда вы вызываете любые другие методы, которые принимают обработчик в core API из цикла событий, тогда Vert.x гарантирует, что эти обработчики при вызове будут выполнены в том же цикле событий.

Я думаю, что этот фрагмент кода может печатать разные имена потоков:

 Vertx vertx = Vertx.vertx(); // The number of event loop threads is 2*core.
vertx.createNetServer().connectHandler(socket -> {
    vertx.deployVerticle(new AbstractVerticle() {
        @Override
        public void start() throws Exception {
            socket.handler(buffer -> {
                log.trace(socket.toString()   ": Socket Message");
                socket.close();
            });
        }
    });
}).listen(port);
  

Но, к сожалению, все обработчики были расположены в одном потоке.

 23:59:42.359 [vert.x-eventloop-thread-1] TRACE Server - io.vertx.core.net.impl.NetSocketImpl@253fa4f2: Socket Message
23:59:42.364 [vert.x-eventloop-thread-1] TRACE Server - io.vertx.core.net.impl.NetSocketImpl@465f1533: Socket Message
23:59:42.365 [vert.x-eventloop-thread-1] TRACE Server - io.vertx.core.net.impl.NetSocketImpl@5ab8dac: Socket Message
23:59:42.366 [vert.x-eventloop-thread-1] TRACE Server - io.vertx.core.net.impl.NetSocketImpl@5fc72993: Socket Message
23:59:42.367 [vert.x-eventloop-thread-1] TRACE Server - io.vertx.core.net.impl.NetSocketImpl@38ee66d7: Socket Message
23:59:42.368 [vert.x-eventloop-thread-1] TRACE Server - io.vertx.core.net.impl.NetSocketImpl@6a60a74: Socket Message
23:59:42.369 [vert.x-eventloop-thread-1] TRACE Server - io.vertx.core.net.impl.NetSocketImpl@5f3921e1: Socket Message
23:59:42.370 [vert.x-eventloop-thread-1] TRACE Server - io.vertx.core.net.impl.NetSocketImpl@39d41024: Socket Message
... more than 100  lines ...
  

Противоположный пример похож на этот сервер echo, написанный на BOOST.ASIO. Обработчики выполняются в разных потоках цикла событий, если для выполнения используется пул потоков io_service::run() .

Итак, мой вопрос в том, как запускать эти обработчики одновременно?

Ответ №1:

На самом деле, вы делаете что-то совершенно отличное от того, что вы намереваетесь.

Каждый раз, когда вы получаете соединение на своем сокете, вы запускаете нового участника,

Самый простой способ доказать, что:

 Vertx vertx = Vertx.vertx(); // The number of event loop threads is 2*core.
vertx.createHttpServer().requestHandler(request -> {

    vertx.deployVerticle(new AbstractVerticle() {

        String uuid = UUID.randomUUID().toString(); // Some random unique number

        @Override
        public void start() throws Exception {
            request.response().end(uuid   " "   Thread.currentThread().getName());
        }

    });
}).listen(8888);



vertx.setPeriodic(1000, r -> {
   System.out.println(vertx.deploymentIDs().size()); // Print verticles count every second
});
  

Я использую HTTPServer только потому, что его проще проверить в браузере.
Каким бы неправильным это ни было, вы все равно увидите, что должны получать разные потоки:

 fe931b18-89cc-4c6a-9d6a-8565bb1f1c12 vert.x-eventloop-thread-9
277330da-4df8-4e91-bd8f-82c0f62156d0 vert.x-eventloop-thread-11
bbd3207c-80a4-41d8-9be5-b40727badc84 vert.x-eventloop-thread-13
  

Теперь о том, как вы должны это сделать:

 // We create 10 workers
for (int i = 0; i < 10; i  ) {
    vertx.deployVerticle(new AbstractVerticle() {

        @Override
        public void start() {

            vertx.eventBus().consumer("processMessage", (request) -> {
                // Do something smart

                // Reply
                request.reply("I'm on thread "   Thread.currentThread().getName());
            });
        }
    });
}

// This is your handler
vertx.createHttpServer().requestHandler(request -> {
    // Only one server, that should dispatch events to workers as quickly as possible
    vertx.eventBus().send("processMessage", null, (response) -> {
        if (response.succeeded()) {
            request.response().end("Request :"   response.result().body().toString());
        }
        // Handle errors
    });
}).listen(8888);

vertx.setPeriodic(1000, r -> {
    System.out.println(vertx.deploymentIDs().size()); // Notice that number of workers doesn't change
});
  

Комментарии:

1. Я еще не понял. Разница между вашим кодом и моим заключается в расположении для развертывания рабочих вершин. Вы развертываете их снаружи, в то время как я развертываю их в обработчике. Означает ли это, что вершины, развернутые в каком-либо обработчике, унаследуют поток развертывания обработчика? И для чего это на самом деле предназначено, если да?

2. Вы создаете вершины для каждого запроса. Вы не должны этого делать. Вершины не наследуют поток. Если вы запустите мой пример, вы увидите, что, хотя я создаю вершины так же, как и вы, я каждый раз получаю разные потоки. Причина, по которой вы думаете, что находитесь в одном потоке, вероятно, заключается в том, как вы работаете со своим сетевым клиентом. Но это уже другая тема.

3. Почему этого не следует делать? В чем разница? Я не могу найти это в документах vert.x или, возможно, мне нужно прочитать его реализацию. 🙂

4. Потому что таким образом вы приводите к утечке памяти.

Ответ №2:

Невозможно определить, какой цикл событий Vert.x назначит каждой из ваших вершин без дополнительной информации (например, количество ядер ваших тестовых машин).

В любом случае, не очень хорошая идея развертывать verticle для каждого входящего соединения. Вертикали — это единицы развертывания в Vert.x . Обычно вы создаете по одной для каждой «функциональности».

Возвращаясь к вашему варианту использования, цель программирования, управляемого событиями, как раз в том, чтобы избежать использования потока для каждого соединения. Вы можете обрабатывать много одновременных подключений с помощью одного цикла событий. Если у вас на компьютере несколько ядер, вы можете развернуть несколько экземпляров вашей verticle, чтобы использовать их все (по 1 циклу событий на ядро).

 int processors = Runtime.getRuntime().availableProcessors();
Vertx vertx = Vertx.vertx();
vertx.deployVerticle(TCPServerVerticle.class.getName(), new DeploymentOptions().setInstances(processors));

public class TCPServerVerticle extends AbstractVerticle {

  @Override
  public void start(Future<Void> startFuture) throws Exception {
    vertx.createNetServer().connectHandler(socket -> {
      socket.handler(buffer -> {
        log.trace(socket.toString()   ": Socket Message");
        socket.close();
      });
    }).listen(port, ar -> {
      if (ar.succeeded()) {
        startFuture.complete();
      } else {
        startFuture.fail(ar.cause());
      }
    });
  }
}
  

При совместном использовании TCP-сервера Vertx обработчики connect будут вызываться циклическим способом.