Как заблокировать доступ на запись к массиву из потока во время чтения

#java #multithreading #concurrency

#java #многопоточность #параллелизм

Вопрос:

У меня есть два потока, работающих параллельно, и чтобы получить информацию об их внутренних результатах, я создал массив int длиной 8. Что касается их идентификатора, они могут обновлять относительную область в массиве statu. Им не разрешается записывать в чужую область. Более того, чтобы корректно получать и отображать массив statu, я пытаюсь написать метод getStatu. При получении результата я хочу заблокировать другим пользователям запись в массив statu; к сожалению, я не понимаю, как заблокировать другим пользователям запись в массив statu, пока я получаю и отображаю результат в методе getStatu. Как?

Примечание: Если есть часть, которая вызывает неправильное понимание, скажите мне, мой друг, я исправлю

 class A{
    Semaphore semaphore;
    int [] statu;  // statu is of length 8

    void update(int idOfThread, int []statu_){
        try {
            semaphore.acquire();
            int idx = idOfThread * 4;

            statu[idx] = statu_[0];
            statu[idx 1] = statu_[1];
            statu[idx 2] = statu_[2];
            statu[idx 3] = statu_[3];
        } catch (...) {

        } finally {
            semaphore.release();
        }
    }

    int[] getStatu(){
        // Block write access of threads
        //    display statu array
        //    return statu array as return value
        // release block, so threads can write to the array

    }
}
  

Комментарии:

1. Вы могли бы выполнить эту работу с помощью java.util.concurrent.ConcurrentHashMap . Но, возможно, вам не следует. Этим вы бы внесли много разногласий и уменьшили параллелизм ваших потоков … и да. Либо A является a singleton , либо statu и semaphore являются статическими. В противном случае ваши потоки не синхронизировались бы, поскольку они не использовали бы общую инфраструктуру.

2. Почему вы не используете ReentrantLock ?

3. @blafasel не могли бы вы написать демонстрационный сегмент кода, и я хочу уменьшить параллелизм двух потоков. На самом деле, почему я должен? Они обновляют два разных места.

4. @prajeeshkumar Я не понимаю, как ReentrantLock это поможет.

5. Используйте ReentrantReadWriteLock . update метод выполнит свою работу в, rwLock.writeLock().lock() пока getStatu будет обращаться к массиву в rwLock.readLock().lock() .

Ответ №1:

Помимо использования другого механизма блокировки / snc, отличного от семафора, просто предложение немного улучшить это.

Объединение обоих массивов status[4] в один массив [8] — не лучшее решение. Рассмотрим задачу A, записывающую свой квадруплет: она должна заблокировать задачу B, читающую то же самое, но нет смысла блокировать задачу B, записывающую квадруплет B, и наоборот.

Вообще говоря, степень детализации того, что блокируется, является одним из важных факторов: блокировка всей базы данных — это бессмыслица (за исключением общей обработки, такой как резервное копирование), однако блокировка отдельных полей записи привела бы к чрезмерным накладным расходам.

Ответ №2:

Возможно, есть лучшие способы добраться туда, куда вы хотите, но только вы знаете, что пытаетесь сделать. Следуя вашей собственной схеме, есть вещи, которые вы делаете неправильно. Во-первых, в настоящее время вы не достигаете детальной блокировки, которую планируете. Для этого у вас должен быть массив семафоров. Таким образом, приобретение будет выглядеть примерно так

 semaphore[idOfThread].acquire();
  

Во-вторых, одна вещь, которую вы не осознали, заключается в том, что контролируемый доступ к данным между потоками — это совместная деятельность. Вы не можете заблокировать один поток и не заботиться о том, чтобы справиться с блокировкой в другом и каким-то образом навязать контроль доступа.

Поэтому, если вызывающий ваш getStatu() не будет использовать тот же набор семафоров при проверке массива, лучше всего, чтобы getStatu() создал новый массив int[] , копируя сегменты каждого потока после блокировки соответствующим семафором. Таким образом, массив, возвращаемый getStatu(), будет моментальным снимком в точке вызова.

Ответ №3:

Пожалуйста, попробуйте приведенный ниже код, он будет работать для вас. вызовите afterStatu() в нем.

 class A {

    Semaphore semaphore;
    int[] statu; // statu is of length 8

    private boolean stuck;

    public A() {
    }

    void update(int idOfThread, int[] statu_) {

        // if true, control will not go further
        while (stuck);

        try {
            semaphore.acquire();
            int idx = idOfThread * 4;

            statu[idx] = statu_[0];
            statu[idx   1] = statu_[1];
            statu[idx   2] = statu_[2];
            statu[idx   3] = statu_[3];
        } catch (Exception e) {

        } finally {
            semaphore.release();
        }
    }

    int[] getStatu() {
        // Block write access of threads
        stuck = true;
        // display statu array
        for (int eachStatu : statu) {
            System.out.println(eachStatu);
        }

        // return statu array as return value
        return statu;
    }

    public void afterStatu() {
        getStatu();
         // release block, so threads can write to the array
        stuck = false;
    }

}
  

Ответ №4:

 ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
int[] statu;

void update() {
    lock.writeLock().lock();
    try {
        // update statu
    } finally {
        lock.writeLock().unlock();
    }
}

int[] getStatu() {
    lock.readLock().lock();
    try {
        // return statu
    } finally {
        lock.readLock().unlock();
    }
}
  

Ответ №5:

Как и сказал ac3, только вы знаете, что пытаетесь сделать.

Вот решение, которое может оказаться полезным в случае, когда каждый поток, вызывающий update(), делает это часто, а вызовы getStatu() редки.Это сложно, но позволяет большинству вызовов update() выполняться вообще без какой-либо блокировки.

 static final int NUMBER_OF_WORKER_THREADS = ...;
final AtomicReference<CountDownLatch> pauseRequested = new AtomicReference<CountDownLatch>(null);
final Object lock = new Object();

int[] statu = ...

//called in "worker" thread.
void update() {
    if (pauseRequested.get() != null) {
        pause();
    }
    ... update my slots in statu[] array ...
}

private void pause() {
    notifyMasterThatIAmPaused();
    waitForMasterToLiftPauseRequest();
}

private void notifyMasterThatIAmPaused() {
    pauseRequested.get().countDown();
}

private void waitForMasterToLiftPauseRequest() {
    synchronized(lock) {
        while (pauseRequested.get() != null) {
            lock.wait();
        }
    }
}

//called in "master" thread

int[] getStatu( ) {
    int[] resu<
    CountDownLatch cdl = requestWorkersToPause();
    waitForWorkersToPause(cdl);
    result = Arrays.copyOf(statu, statu.length);
    liftPauseRequest();
    return resu<
}

private CountDownLatch requestWorkersToPause() {
    cdl = new CountDownLatch(NUMBER_OF_WORKER_THREADS);
    pauseRequested.set(cdl);
    return cdl;
}

private void waitForWorkersToPause(CountDownLatch cdl) {
    cdl.await();
}

private void liftPauseRequest() {
    synchronized(lock) {
        pauseRequested.set(null);
        lock.notifyAll();
    }
}