#java #multithreading #concurrency #aws-lambda
#java #многопоточность #параллелизм #aws-lambda
Вопрос:
После использования таких языков, как Erlang и других, которые имеют легкие параллельные процессы, мне трудно понять, как это переводится на Java. Учитывая, что я использую одноядерный компьютер, есть ли способ выполнить несколько одновременных операций ввода-вывода (http)?
Я нашел следующее ExecutorService
и CompletableFuture
. Проблема, с которой я сталкиваюсь, заключается в том, что они основаны на пуле потоков. Пул потоков по умолчанию использует core # — 1, который на одноядерном компьютере, который я использую, НЕ имеет параллелизма. Было бы решением просто предоставить пользовательский Executor
интерфейс с большим количеством потоков? или есть более идиоматический способ параллелизма, связанного с вводом-выводом, на одноядерных машинах в Java?
Я запускаю этот код на AWS Lambda с одним ядром.
Ответ №1:
«Пул потоков по умолчанию использует core # — 1, который на одноядерном компьютере, который я использую, НЕ имеет параллелизма».— Почему? Параллельная программа может очень хорошо работать на одноядерном компьютере. Это не имеет ничего общего с параллелизмом.
Когда поток Java ожидает ввода-вывода, планировщик ядра переместит его в очередь ожидания, и будет запущен какой-либо другой поток, требующий процессорного времени. Таким образом, вы можете создать пул потоков с таким количеством потоков, сколько захотите, а планировщик позаботится о параллелизме. И это будет нормально работать даже на одноядерном компьютере.
Единственным ограничением здесь является количество потоков, которые вы будете создавать. Размер стека потока по умолчанию варьируется от ч / б 512K
до 1M
. Так что это не очень хорошо масштабируется, и в какой-то момент у вас закончатся потоки. В моей системе я мог бы создать около 5 тыс. из них. Такие языки, как Go, управляют этим путем мультиплексирования нескольких подпрограмм в ограниченном количестве потоков ядра. Для этого требуется планирование с помощью среды выполнения Go.
Если вы хотите облегчить это, вам следует изучить NIO
. Я написал быструю программу, которую вы можете использовать, чтобы узнать, сколько одновременных подключений вы действительно можете поддерживать таким образом. Это должно выполняться как есть после импорта:
public class ConcurrentBlockingServer {
private ExecutorService pool = Executors.newCachedThreadPool();
public static void main(String[] args) {
ConcurrentBlockingServer bs = new ConcurrentBlockingServer();
try {
bs.listen();
} catch (IOException e) {
e.printStackTrace();
}
}
private void listen() throws IOException {
int connectionId = 0;
ServerSocket ss = new ServerSocket(8080);
while (true) {
Socket s = ss.accept(); // blocking call, never null
System.out.println("Connection: " ( connectionId));
process(s);
}
}
private void process(Socket s) {
Runnable task =
() -> {
try (InputStream is = s.getInputStream();
OutputStream os = s.getOutputStream()) {
int data;
// -1 is EOF, .read() is blocking
while (-1 != (data = is.read())) {
os.write(flipCase(data));
os.flush();
}
} catch (IOException e) {
e.printStackTrace();
}
};
pool.submit(task);
}
private int flipCase(int input) {
if (input >= 65 amp;amp; input <= 90) {
return input 32;
} else if (input >= 97 amp;amp; input <= 122) {
return input - 32;
} else {
return input;
}
}
}
Запустите эту программу и посмотрите, сколько соединений вы могли бы установить.
public class RogueClient {
private static long noClients = 9000;
public static void main(String[] args) {
for (int i = 0; i < noClients; i ) {
try {
new Socket("localhost", 8080);
System.out.println("Connection No: " i);
} catch (IOException e) {
System.err.println("Exception: " e.getMessage() ", for connection: " i);
}
}
try {
Thread.sleep(Integer.MAX_VALUE);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Редактировать: размер пула должен зависеть от характера вашей программы. Если это задача, связанная с вводом-выводом, вы можете продолжить и создать много потоков. Но для программ, привязанных к процессору, количество потоков должно быть равно количеству ядер.
Ответ №2:
Потоки являются основным (и единственным) механизмом параллелизма в Java. ExecutorService
имеет методы для создания пулов потоков с произвольным количеством потоков; также CompletableFuture
s не ограничены ForkJoinPool.commonPool
. Те, которые снабжены количеством потоков = количеством ядер, являются просто удобными методами для задач, связанных с процессором.
Ответ №3:
Для работы с привязкой к вводу-выводу вы могли бы выделить больше потоков, как вы уже упоминали, пока они выполняют ввод-вывод, они блокируются, поэтому другие все еще могут использовать процессор.
Вы также можете взглянуть на Project Reactor, который может пригодиться, если вы не хотите вручную обрабатывать потоки с помощью этого конкретного планировщика: https://projectreactor.io/docs/core/release/api/reactor/core/scheduler/Schedulers.html#boundedElastic—