AtomicInteger не работает должным образом в Java

#java #multithreading #concurrency

#java #многопоточность #параллелизм

Вопрос:

У меня есть потоки записи и чтения, хотя я использую AtomicBoolean и AtomicInteger, я мог видеть повторяющиеся значения в потоке чтения, пожалуйста, помогите мне найти, что не так с моим кодом.

 package automic;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
public class AutomicTest {

    public volatile AtomicBoolean isStopped = new AtomicBoolean(false);
    public AtomicInteger  count =  new AtomicInteger(0);
    
    public static void main(String[] args) throws InterruptedException {
        AutomicTest test = new AutomicTest();
        
        Thread writerThread = new Thread(() ->{
            while(!test.isStopped.get()) {
                test.count.incrementAndGet();
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        
        
        Thread readerThread = new Thread(() ->{
            while(!test.isStopped.get()) {
                System.out.println("Counter :" test.count.get());
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        
        
        writerThread.start();
        readerThread.start();
        
        Thread.sleep(4000);
        
        test.isStopped.getAndSet(true);
        
        writerThread.join();
        readerThread.join();
    }
}



Counter :1
Counter :2
Counter :3 // duplicate
Counter :3 // duplicate
Counter :4
Counter :5
Counter :7
Counter :8
Counter :9
Counter :10
Counter :11
Counter :12 // duplicate
Counter :12 // duplicate
Counter :13
Counter :15 // duplicate
Counter :15 // duplicate
Counter :17
Counter :18
Counter :19
Counter :20
Counter :21
Counter :22
Counter :23
Counter :24
Counter :25
Counter :26
Counter :27
Counter :28
Counter :29
Counter :30
Counter :31
Counter :32
Counter :33
Counter :34
Counter :35
Counter :36
Counter :37
Counter :38
Counter :39
Counter :40
 

Комментарии:

1. Итак, читатель действовал дважды между приращением записи? Как вы думаете, почему это не «работает должным образом»

2. Определите синхронизацию . get() просто извлекает текущее значение, которое он видит в AtomicInteger .

3. Проблема не в том, что это AtomicInteger не работает. Это так. То, что вы видите, — это простая проблема с переплетением потоков. Ожидание 100 мс не означает, что вы будете планировать потоки в точном порядке. В зависимости от вашей ОС время может иметь худшую степень детализации, чем 1 мс. Даже если это не так, многое может произойти за один мс, и потоки, скорее всего, просто выполняются в одном мс, иногда меняя порядок. Если вы хотите, чтобы они синхронизировались тик-так, вам нужно использовать CountDownLatch или что-то подобное.

4. Два потока не обязательно выполняются синхронно. Представьте, что 1-й поток имеет преимущество в 150 миллисекунд. Это означало бы, что внутренний цикл 1-го потока (скорее всего) выполняется дважды, прежде чем 2-й поток выполнит свою первую итерацию.

5. Вы также можете видеть, что после дубликатов вы видите пропущенные числа. Это потому, что потоки снова изменили свой порядок, и читатель «пропустил» один удар. Это нормально, добро пожаловать в многопоточность.

Ответ №1:

Два больших вывода из этого заключаются в том, что:

  • Thread.sleep(100) не означает «Спящий режим в течение 100 мс, с точностью до наносекунды». Это немного менее точно, зависит от детализации и точности внутренних часов ОС, от планирования собственных потоков, других задач, выполняемых на компьютере. Даже цикл сна-пробуждения занимает некоторое (на удивление большое) количество времени.
  • Атомизация хороша, когда несколько потоков взаимодействуют независимо. Если вам нужно, чтобы они были как-то зависимыми и / или реагировали на действия других потоков, атомарные — это не то, что вам нужно. Вам нужно будет использовать реальный механизм синхронизации.

Поэтому вы не можете использовать sleep() и atomics для планирования выполнения двух потоков в идеально сбалансированном цикле тик-так.

Это происходит в вашем коде:

  1. Поток записи записывает значение 1 в момент времени 100, затем переходит в спящий режим.
  2. Поток чтения считывает значение 1 в момент времени 100, затем переходит в спящий режим. Они оба могут легко запускаться на одной и той же ms, в многоядерной системе они могут даже запускаться в одно и то же время, и они, вероятно, делают.
  3. Читатель просыпается во время 200.0 и снова считывает значение 1.
  4. Writer просыпается во время 200.02 и записывает значение 2. Упс, мы только что получили дубликат.

Обратите внимание, что потоки могут даже откидываться назад, в этом случае вы увидите пропущенное число в последовательности, и иногда вы это делаете. Чтобы сбалансировать потоки для запуска в идеальной схеме A-B-A-B, вы можете сделать, например, что-то вроде этого:

 public class AutomicTest {

    private volatile boolean isStopped = false;

    private final CyclicBarrier barrier = new CyclicBarrier(2);
    private int count = 0;

    public static void main(String[] args) throws InterruptedException {
        AutomicTest test = new AutomicTest();

        Thread writerThread = new Thread(() -> {
            while (!test.isStopped) {
                test.count  ;
                try {
                    test.barrier.await();
                    Thread.sleep(100);
                } catch (InterruptedException | BrokenBarrierException ignored) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        });

        Thread readerThread = new Thread(() -> {
            while (!test.isStopped) {
                try {
                    test.barrier.await();
                    System.out.println("Counter: "   test.count);
                    Thread.sleep(100);
                } catch (InterruptedException | BrokenBarrierException ignored) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        });

        writerThread.start();
        readerThread.start();

        Thread.sleep(4000);

        test.isStopped = true;

        writerThread.join();
        readerThread.join();
    }

}
 

Ключевым моментом здесь является CyclicBarrier то, что:

Средство синхронизации, которое позволяет набору потоков ожидать, пока все другие достигнут общей точки барьера. CyclicBarriers полезны в программах, включающих группу потоков фиксированного размера, которые должны время от времени ожидать друг друга. Барьер называется циклическим, потому что его можно повторно использовать после освобождения ожидающих потоков.

В этом случае барьер настроен на две синхронизированные стороны — писатель и читатель:

  • Автор сначала записывает свое значение, затем ожидает, пока все стороны достигнут барьера (другими словами, он ожидает, пока читатель прочитает значение).
  • Читатель сначала ждет, пока все стороны достигнут барьера (другими словами, он ждет, пока автор запишет новое значение), только затем считывает значение.

В этой схеме count видимость значения обеспечивается с помощью CyclicBarrier , поэтому вам даже не нужен AtomicInteger here . Более конкретно:

Действия в потоке до вызова await() происходят — перед […] действиями после успешного возврата из соответствующего await() в других потоках.

О, и isStopped также не нужен AtomicBoolean , volatile достаточно. Но это будет работать в любом случае. Извините, я понимаю, что это должно было быть задачей для практики atomics, но они не являются хорошим инструментом, если вам нужно, чтобы потоки ждали друг друга.


Примечание: приведенный выше механизм по-прежнему не совсем корректен при удалении sleep() вызовов. Причина этого в том, что после освобождения считыватель конкурирует с записывающим устройством на следующей итерации цикла. Чтобы исправить это, автор должен дождаться завершения предыдущего чтения, а читатель должен дождаться завершения записи. Этого можно добиться, используя второй барьер или, возможно Phaser , тот, который я намеренно не использовал в приведенном выше примере, поскольку он более продвинутый, и вам нужно изучить CyclicBarriers и CountDownLatches, прежде чем переходить к фазерам. Также необходимо настроить механизм завершения работы. Удачи!

РЕДАКТИРОВАТЬ: я на самом деле написал решение без sleep() двойного фазирования и обнаружил, что его намного легче читать (если вас не волнует прерывание длительной задачи, которое вы обычно должны!) и намного быстрее, чем эквивалентное решение CyclicBarrier. Итак, мы оба кое-что узнали сегодня. Вот оно:

 public class AutomicTest {

    private volatile boolean isStopped = false;

    private final Phaser valueWritten = new Phaser(2);
    private final Phaser valueRead = new Phaser(2);
    private int count = 0;

    public static void main(String[] args) throws InterruptedException {
        AutomicTest test = new AutomicTest();

        Thread writerThread = new Thread(() -> {
            while (!test.isStopped) {
                // wait for the previous value to be read
                test.valueRead.arriveAndAwaitAdvance();
                test.count  ;
                // acknowledge the write
                test.valueWritten.arrive();
            }
        });

        Thread readerThread = new Thread(() -> {
            while (!test.isStopped) {
                // wait for the value to be written
                test.valueWritten.arriveAndAwaitAdvance();
                System.out.println("Counter: "   test.count);
                // acknowledge the read
                test.valueRead.arrive();
            }
        });

        writerThread.start();
        readerThread.start();
        test.valueRead.arrive(); // start the writer

        Thread.sleep(4000);

        test.isStopped = true;
        test.valueRead.forceTermination();
        test.valueWritten.forceTermination();

        writerThread.join();
        readerThread.join();
    }

}
 

Комментарии:

1. Большое спасибо, это было отличное исправление и подробное объяснение.

2. @sunleo Добро пожаловать! Я только что добавил решение no-sleep(), потому что узнал, что оно действительно может выглядеть красиво и быть очень быстрым.