#java #multithreading #concurrency
#java #многопоточность #параллелизм
Вопрос:
У меня есть два потока, работающих параллельно, и чтобы получить информацию об их внутренних результатах, я создал массив int длиной 8. Что касается их идентификатора, они могут обновлять относительную область в массиве statu. Им не разрешается записывать в чужую область. Более того, чтобы корректно получать и отображать массив statu, я пытаюсь написать метод getStatu. При получении результата я хочу заблокировать другим пользователям запись в массив statu; к сожалению, я не понимаю, как заблокировать другим пользователям запись в массив statu, пока я получаю и отображаю результат в методе getStatu. Как?
Примечание: Если есть часть, которая вызывает неправильное понимание, скажите мне, мой друг, я исправлю
class A{
Semaphore semaphore;
int [] statu; // statu is of length 8
void update(int idOfThread, int []statu_){
try {
semaphore.acquire();
int idx = idOfThread * 4;
statu[idx] = statu_[0];
statu[idx 1] = statu_[1];
statu[idx 2] = statu_[2];
statu[idx 3] = statu_[3];
} catch (...) {
} finally {
semaphore.release();
}
}
int[] getStatu(){
// Block write access of threads
// display statu array
// return statu array as return value
// release block, so threads can write to the array
}
}
Комментарии:
1. Вы могли бы выполнить эту работу с помощью
java.util.concurrent.ConcurrentHashMap
. Но, возможно, вам не следует. Этим вы бы внесли много разногласий и уменьшили параллелизм ваших потоков … и да. Либо A является asingleton
, либоstatu
иsemaphore
являются статическими. В противном случае ваши потоки не синхронизировались бы, поскольку они не использовали бы общую инфраструктуру.2. Почему вы не используете
ReentrantLock
?3. @blafasel не могли бы вы написать демонстрационный сегмент кода, и я хочу уменьшить параллелизм двух потоков. На самом деле, почему я должен? Они обновляют два разных места.
4. @prajeeshkumar Я не понимаю, как
ReentrantLock
это поможет.5. Используйте
ReentrantReadWriteLock
.update
метод выполнит свою работу в,rwLock.writeLock().lock()
покаgetStatu
будет обращаться к массиву вrwLock.readLock().lock()
.
Ответ №1:
Помимо использования другого механизма блокировки / snc, отличного от семафора, просто предложение немного улучшить это.
Объединение обоих массивов status[4] в один массив [8] — не лучшее решение. Рассмотрим задачу A, записывающую свой квадруплет: она должна заблокировать задачу B, читающую то же самое, но нет смысла блокировать задачу B, записывающую квадруплет B, и наоборот.
Вообще говоря, степень детализации того, что блокируется, является одним из важных факторов: блокировка всей базы данных — это бессмыслица (за исключением общей обработки, такой как резервное копирование), однако блокировка отдельных полей записи привела бы к чрезмерным накладным расходам.
Ответ №2:
Возможно, есть лучшие способы добраться туда, куда вы хотите, но только вы знаете, что пытаетесь сделать. Следуя вашей собственной схеме, есть вещи, которые вы делаете неправильно. Во-первых, в настоящее время вы не достигаете детальной блокировки, которую планируете. Для этого у вас должен быть массив семафоров. Таким образом, приобретение будет выглядеть примерно так
semaphore[idOfThread].acquire();
Во-вторых, одна вещь, которую вы не осознали, заключается в том, что контролируемый доступ к данным между потоками — это совместная деятельность. Вы не можете заблокировать один поток и не заботиться о том, чтобы справиться с блокировкой в другом и каким-то образом навязать контроль доступа.
Поэтому, если вызывающий ваш getStatu() не будет использовать тот же набор семафоров при проверке массива, лучше всего, чтобы getStatu() создал новый массив int[] , копируя сегменты каждого потока после блокировки соответствующим семафором. Таким образом, массив, возвращаемый getStatu(), будет моментальным снимком в точке вызова.
Ответ №3:
Пожалуйста, попробуйте приведенный ниже код, он будет работать для вас. вызовите afterStatu()
в нем.
class A {
Semaphore semaphore;
int[] statu; // statu is of length 8
private boolean stuck;
public A() {
}
void update(int idOfThread, int[] statu_) {
// if true, control will not go further
while (stuck);
try {
semaphore.acquire();
int idx = idOfThread * 4;
statu[idx] = statu_[0];
statu[idx 1] = statu_[1];
statu[idx 2] = statu_[2];
statu[idx 3] = statu_[3];
} catch (Exception e) {
} finally {
semaphore.release();
}
}
int[] getStatu() {
// Block write access of threads
stuck = true;
// display statu array
for (int eachStatu : statu) {
System.out.println(eachStatu);
}
// return statu array as return value
return statu;
}
public void afterStatu() {
getStatu();
// release block, so threads can write to the array
stuck = false;
}
}
Ответ №4:
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
int[] statu;
void update() {
lock.writeLock().lock();
try {
// update statu
} finally {
lock.writeLock().unlock();
}
}
int[] getStatu() {
lock.readLock().lock();
try {
// return statu
} finally {
lock.readLock().unlock();
}
}
Ответ №5:
Как и сказал ac3, только вы знаете, что пытаетесь сделать.
Вот решение, которое может оказаться полезным в случае, когда каждый поток, вызывающий update(), делает это часто, а вызовы getStatu() редки.Это сложно, но позволяет большинству вызовов update() выполняться вообще без какой-либо блокировки.
static final int NUMBER_OF_WORKER_THREADS = ...;
final AtomicReference<CountDownLatch> pauseRequested = new AtomicReference<CountDownLatch>(null);
final Object lock = new Object();
int[] statu = ...
//called in "worker" thread.
void update() {
if (pauseRequested.get() != null) {
pause();
}
... update my slots in statu[] array ...
}
private void pause() {
notifyMasterThatIAmPaused();
waitForMasterToLiftPauseRequest();
}
private void notifyMasterThatIAmPaused() {
pauseRequested.get().countDown();
}
private void waitForMasterToLiftPauseRequest() {
synchronized(lock) {
while (pauseRequested.get() != null) {
lock.wait();
}
}
}
//called in "master" thread
int[] getStatu( ) {
int[] resu<
CountDownLatch cdl = requestWorkersToPause();
waitForWorkersToPause(cdl);
result = Arrays.copyOf(statu, statu.length);
liftPauseRequest();
return resu<
}
private CountDownLatch requestWorkersToPause() {
cdl = new CountDownLatch(NUMBER_OF_WORKER_THREADS);
pauseRequested.set(cdl);
return cdl;
}
private void waitForWorkersToPause(CountDownLatch cdl) {
cdl.await();
}
private void liftPauseRequest() {
synchronized(lock) {
pauseRequested.set(null);
lock.notifyAll();
}
}