#java #multithreading #concurrency #threadpool #cyclicbarrier
#java #многопоточность #параллелизм #пул потоков #циклический барьер #threadpool #cyclicbarrier
Вопрос:
Я проводил несколько тестов с параллельной обработкой и создал программу, которая, учитывая матрицу целых чисел, повторно вычисляет значение каждой позиции на основе соседей.
Мне нужна была копия матрицы, чтобы значения не были переопределены, и я использовал a CyclicBarrier
для объединения результатов после решения частичных проблем:
CyclicBarrier cyclic_barrier = new CyclicBarrier(n_tasks 1, new Runnable() {
public void run() {
ParallelProcess.mergeResult();
}
});
ParallelProcess p = new ParallelProcess(cyclic_barrier, n_rows, r_cols); // init
Каждой задаче назначается часть матрицы: я разбиваю ее на равные части по строкам. Но может случиться так, что деления не являются точными, поэтому будет небольшой фрагмент, соответствующий последней строке, который не будет отправлен в пул потоков.
Пример: если у меня есть 16
строки и n_tasks = 4
нет проблем, все 4 будут отправлены в пул. Но если бы у меня было 18
вместо этого, первые 16 будут отправлены, но не последние два.
Поэтому я заставляю отправить, если это произойдет. Ну, на самом деле я не отправляю, потому что я использую фиксированный пул потоков, который я создал таким ExecutorService e = Executors.newFixedThreadPool(n_tasks)
образом. Поскольку все слоты в пуле заняты, а потоки заблокированы барьером ( mybarrier.await()
вызывается в run
методе) Я не мог отправить его в пул, поэтому я просто использовал Thread.start()
.
Давайте перейдем к делу. Поскольку мне нужно учитывать CyclicBarrier
возможность того, что этот фрагмент останется, количество сторон должно быть увеличено на 1.
Но если бы этого не произошло, мне не хватило бы одной стороны, чтобы вызвать барьер.
Каково мое решение?:
if (lower_limit != n_rows) { // the remaining chunk to be processed
Thread t = new Thread(new ParallelProcess(lower_limit, n_rows));
t.start();
t.join();
}
else {
cyclic_barrier.await();
}
Я чувствую, что обманываю, когда использую cyclic_barrier.await()
трюк, чтобы поднять барьер силой.
Есть ли какой-либо другой способ, которым я мог бы подойти к этой проблеме, чтобы мне не приходилось делать то, что я делаю?
Комментарии:
1. Как насчет этого: в рабочем, который последним прибывает к барьеру (проверка через значение, возвращаемое
barrier.await()
), вы можете проверить, остались ли элементы для продолжения. Если да, worker может отправить новую задачу для обработки оставшихся элементов. Возможно, вам все равно понадобится использовать дополнительную синхронизацию внутриmergeResult()
beyondbarrier.await()
— точно так же, как вы показываетеt.join()
.2. @VictorSorokin Я не понимаю, как проверка возвращаемого значения
await()
поможет мне проверить, есть ли левая часть.3. Разве значения
rows
иn_threads
не известны заранее (до того, как вы начнете отправлять задачи)?4. Да, конечно, они есть.
5. Тогда почему вы не можете добавить код в свой worker
run()
, который будет проверять,await()
возвращено ли значение 0 (последнее поступление, следовательно, барьер освобожден), и если оно равно 0, тогда worker проверит, были ли обработаны все элементы?
Ответ №1:
Хотя это не отвечает на ваш вопрос о CyclicBarriers, могу ли я рекомендовать использовать фазер? У этого есть возможность включать количество сторон, а также позволяет запускать mergeResult
при срабатывании фазы.
Итак, прежде чем выполнять асинхронное вычисление, просто register
. Затем внутри этого вычисления поток поступает на phaser
. Когда все потоки будут доставлены, он продвинет фазу и может вызвать переопределенный метод onAdvance
.
Представление:
ParallelProcess process = new ParallelProcess(lower_limit, n_rows));
phaser.register();
executor.submit(process);
Процессор
public void run(){
//do stuff
phaser.arrive();
}
Фазер
Phaser phaser = new Phaser(){
protected boolean onAdvance(int phase, int registeredParties) {
ParallelProcess.mergeResult();
return true;
}
}