Как убедиться, что все потоки завершились перед вызовом notify?

#java #multithreading #thread-safety

#java #многопоточность #безопасность потоков

Вопрос:

Я пытаюсь убедиться, что все мои потоки класса A завершились до вызова notify. На данный момент, если один поток завершил свою работу, он вызывает notify (потоки класса B), в то время как другие потоки A все еще выполняются. Если это произойдет, B-потоки начнут работать, что изменит условие для оставшихся A-потоков.

Я пытался использовать блоки синхронизации, но, думаю, я использую их неправильно.

Идея в том, что A заполняет массив до его заполнения и уведомляет B, чтобы он мог снова очистить массив.

 public class A extends Thread {

    public static Object lockA = new Object();

    private ArrayList<String> list;

    public A(ArrayList<String> list) {
        this.list = list;
    }

    public void run(){

        while(true){
            synchronized (A.lockA){
                if(list.size() < 10){
                    list.add("A");
                    System.out.println(currentThread().getName()   " "   list);
                }else{
                    synchronized (B.lockB){
                        B.lockB.notifyAll();

                    }
                    return;
                }
            }

        }
    }

}
  
 public class B extends Thread {

    public static Object lockB = new Object();

    private ArrayList<String> list;

    public B(ArrayList<String> list) {
        this.list = list;
    }

    public void run(){


            synchronized (B.lockB){
                try {
                    B.lockB.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                while (list.size() > 0){
                    list.remove("A");
                    System.out.println(currentThread().getName()   " "   list);
                }
                return;
            }
    }
}
  
 public class Main {
    public static void main(String[] args) {
        ArrayList<String> list = new ArrayList<>();

        A a = new A(list);
        A aa = new A(list);
        A aaa = new A(list);
        B b = new B(list);
        B bb = new B(list);
        B bbb = new B(list);
        B bbbb = new B(list);

        a.start();
        aa.start();
        aaa.start();
        b.start();
        bb.start();
        bbb.start();
        bbbb.start();
    }
}
  

Ответ №1:

Если lockB не защищает какое-либо состояние, в котором поток может захотеть дождаться изменений или может потребоваться уведомить другой поток об изменениях, использовать его в сочетании с wait и notify не имеет никакого смысла.

         synchronized (B.lockB){
            try {
                B.lockB.wait();
  

Вы никогда не должны вызывать wait , пока не убедитесь, что то, чего вы ждете, еще не произошло. Причина, по которой у вас есть этот код внутри synchronized блока, заключается именно в том, что вы можете выполнить эту проверку и ждать, если и только если вам нужно подождать.

                 synchronized (B.lockB){
                    B.lockB.notifyAll();
                }
  

Это довольно сбивает с толку. Зачем вы вызываете lockB.notifyAll() , если вы не изменили ничего, защищенного этой блокировкой, о чем вам нужно уведомлять другие потоки?

Похоже, вы действительно не понимаете, как на самом деле работает wait / notify семантика. Дело в том, что вам нужно подождать, только если что-то еще не произошло, и вы никогда не можете быть уверены, произошло ли это уже (потому что планировщики непредсказуемы), не удерживая некоторую блокировку. Но вам нужно снять эту блокировку перед ожиданием, создав условие гонки, если какой-то другой поток сделает то, чего вы ждете, после того, как вы снимете блокировку, но до того, как вы начнете ждать. wait Функция — это атомарная функция для снятия блокировки и ожидания, пока что-то произойдет. Атомарность позволяет избежать этого состояния гонки.

Однако у вас нет разблокированной блокировки, защищающей то, чего вы ждете. Это разрушает логику атомарной функции разблокировки и ожидания и приводит к коду, который всегда может зайти в тупик.

Ответ №2:

Я бы исказил список в классе, который допускает потокобезопасные обновления списка :

 class ListWraper {
    //See also https://docs.oracle.com/javase/8/docs/api/java/util/Collections.html#synchronizedList-java.util.List-
    private volatile List<String> list;

    ListWraper() {
        list = new ArrayList<>();
    }

    //add to list
    synchronized boolean add(String s){
        return list.add(s);
    }

    //remove from list
    synchronized boolean remove(String s){
        return list.remove(s);
    }


    //get a defensive copy of list
    List<String> getList() {
        return new ArrayList<>(list);
    }
}
  

Поток, который использует ListWarper :

 class ListUpdater extends Thread {

    private static String abc = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
    private static int threadsCouter = 0;             //used for
    private final int threadNumber = threadsCouter  ; //printing only
    private final Random rd = new Random();
    private final ListWraper list;

    ListUpdater(ListWraper list) {
        this.list = list;
    }

    @Override
    public void run(){
         for(int i = 0; i < 10; i  ){
             //add random character to list 
             list.add(String.valueOf( abc.charAt(rd.nextInt(abc.length()))));
             try {
                TimeUnit.MILLISECONDS.sleep(100   rd.nextInt(500)); //simulate long process 
            } catch (InterruptedException ex) { ex.printStackTrace();   }
         }
         System.out.println("Thread number "   threadNumber   " finished");
    }
}
  

И протестируйте это с помощью:

 public class Main{

    public static void main(String args[]) throws InterruptedException {
        ListWraper list = new ListWraper();
        ListUpdater updater1 = new ListUpdater(list);
        ListUpdater updater2 = new ListUpdater(list);
        updater1.start();
        updater2.start();
        updater1.join(); //wait for thread to finish 
        updater2.join(); //wait for thread to finish 
        System.out.println("All finished ");
    }
}
  

Вывод:

Поток номер 1 завершен,
Поток номер 0 завершен,
Все завершено

Полный исполняемый код можно найти здесь.

Другой простой альтернативой является использование общего CountDownLatch, как показано здесь.

Ответ №3:

Я пытался использовать блоки синхронизации, но, думаю, я использую их неправильно.

В общем, да. Для синхронизации между несколькими потоками вам лучше использовать высокоуровневые примитивы, которые можно найти в пакете java.util.concurrent.

Я пытаюсь убедиться, что все мои потоки класса A завершились до вызова notify.

CountDownLatch подходит непосредственно к вашему случаю.