Параллельный ввод-вывод на одноядерном Java 11

#java #multithreading #concurrency #aws-lambda

#java #многопоточность #параллелизм #aws-lambda

Вопрос:

После использования таких языков, как Erlang и других, которые имеют легкие параллельные процессы, мне трудно понять, как это переводится на Java. Учитывая, что я использую одноядерный компьютер, есть ли способ выполнить несколько одновременных операций ввода-вывода (http)?

Я нашел следующее ExecutorService и CompletableFuture . Проблема, с которой я сталкиваюсь, заключается в том, что они основаны на пуле потоков. Пул потоков по умолчанию использует core # — 1, который на одноядерном компьютере, который я использую, НЕ имеет параллелизма. Было бы решением просто предоставить пользовательский Executor интерфейс с большим количеством потоков? или есть более идиоматический способ параллелизма, связанного с вводом-выводом, на одноядерных машинах в Java?

Я запускаю этот код на AWS Lambda с одним ядром.

Ответ №1:

«Пул потоков по умолчанию использует core # — 1, который на одноядерном компьютере, который я использую, НЕ имеет параллелизма».— Почему? Параллельная программа может очень хорошо работать на одноядерном компьютере. Это не имеет ничего общего с параллелизмом.

Когда поток Java ожидает ввода-вывода, планировщик ядра переместит его в очередь ожидания, и будет запущен какой-либо другой поток, требующий процессорного времени. Таким образом, вы можете создать пул потоков с таким количеством потоков, сколько захотите, а планировщик позаботится о параллелизме. И это будет нормально работать даже на одноядерном компьютере.

Единственным ограничением здесь является количество потоков, которые вы будете создавать. Размер стека потока по умолчанию варьируется от ч / б 512K до 1M . Так что это не очень хорошо масштабируется, и в какой-то момент у вас закончатся потоки. В моей системе я мог бы создать около 5 тыс. из них. Такие языки, как Go, управляют этим путем мультиплексирования нескольких подпрограмм в ограниченном количестве потоков ядра. Для этого требуется планирование с помощью среды выполнения Go.

Если вы хотите облегчить это, вам следует изучить NIO . Я написал быструю программу, которую вы можете использовать, чтобы узнать, сколько одновременных подключений вы действительно можете поддерживать таким образом. Это должно выполняться как есть после импорта:

 public class ConcurrentBlockingServer {

  private ExecutorService pool = Executors.newCachedThreadPool();

  public static void main(String[] args) {
    ConcurrentBlockingServer bs = new ConcurrentBlockingServer();
    try {
      bs.listen();
    } catch (IOException e) {
      e.printStackTrace();
    }
  }

  private void listen() throws IOException {
    int connectionId = 0;
    ServerSocket ss = new ServerSocket(8080);
    while (true) {
      Socket s = ss.accept(); // blocking call, never null
      System.out.println("Connection: "   (  connectionId));
      process(s);
    }
  }

  private void process(Socket s) {
    Runnable task =
        () -> {
          try (InputStream is = s.getInputStream();
              OutputStream os = s.getOutputStream()) {
            int data;
            // -1 is EOF, .read() is blocking
            while (-1 != (data = is.read())) {
              os.write(flipCase(data));
              os.flush();
            }
          } catch (IOException e) {
            e.printStackTrace();
          }
        };
    pool.submit(task);
  }

  private int flipCase(int input) {
    if (input >= 65 amp;amp; input <= 90) {
      return input   32;
    } else if (input >= 97 amp;amp; input <= 122) {
      return input - 32;
    } else {
      return input;
    }
  }
}
  

Запустите эту программу и посмотрите, сколько соединений вы могли бы установить.

 public class RogueClient {

  private static long noClients = 9000;

  public static void main(String[] args) {
    for (int i = 0; i < noClients; i  ) {
      try {
        new Socket("localhost", 8080);
        System.out.println("Connection No: "   i);
      } catch (IOException e) {
        System.err.println("Exception: "   e.getMessage()   ", for connection: "   i);
      }
    }
    try {
      Thread.sleep(Integer.MAX_VALUE);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }
}
  

Редактировать: размер пула должен зависеть от характера вашей программы. Если это задача, связанная с вводом-выводом, вы можете продолжить и создать много потоков. Но для программ, привязанных к процессору, количество потоков должно быть равно количеству ядер.

Ответ №2:

Потоки являются основным (и единственным) механизмом параллелизма в Java. ExecutorService имеет методы для создания пулов потоков с произвольным количеством потоков; также CompletableFuture s не ограничены ForkJoinPool.commonPool . Те, которые снабжены количеством потоков = количеством ядер, являются просто удобными методами для задач, связанных с процессором.

Ответ №3:

Для работы с привязкой к вводу-выводу вы могли бы выделить больше потоков, как вы уже упоминали, пока они выполняют ввод-вывод, они блокируются, поэтому другие все еще могут использовать процессор.

Вы также можете взглянуть на Project Reactor, который может пригодиться, если вы не хотите вручную обрабатывать потоки с помощью этого конкретного планировщика: https://projectreactor.io/docs/core/release/api/reactor/core/scheduler/Schedulers.html#boundedElastic—