#java #multithreading #concurrency
#java #многопоточность #параллелизм
Вопрос:
У меня есть потоки записи и чтения, хотя я использую AtomicBoolean и AtomicInteger, я мог видеть повторяющиеся значения в потоке чтения, пожалуйста, помогите мне найти, что не так с моим кодом.
package automic;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
public class AutomicTest {
public volatile AtomicBoolean isStopped = new AtomicBoolean(false);
public AtomicInteger count = new AtomicInteger(0);
public static void main(String[] args) throws InterruptedException {
AutomicTest test = new AutomicTest();
Thread writerThread = new Thread(() ->{
while(!test.isStopped.get()) {
test.count.incrementAndGet();
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
Thread readerThread = new Thread(() ->{
while(!test.isStopped.get()) {
System.out.println("Counter :" test.count.get());
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
writerThread.start();
readerThread.start();
Thread.sleep(4000);
test.isStopped.getAndSet(true);
writerThread.join();
readerThread.join();
}
}
Counter :1
Counter :2
Counter :3 // duplicate
Counter :3 // duplicate
Counter :4
Counter :5
Counter :7
Counter :8
Counter :9
Counter :10
Counter :11
Counter :12 // duplicate
Counter :12 // duplicate
Counter :13
Counter :15 // duplicate
Counter :15 // duplicate
Counter :17
Counter :18
Counter :19
Counter :20
Counter :21
Counter :22
Counter :23
Counter :24
Counter :25
Counter :26
Counter :27
Counter :28
Counter :29
Counter :30
Counter :31
Counter :32
Counter :33
Counter :34
Counter :35
Counter :36
Counter :37
Counter :38
Counter :39
Counter :40
Комментарии:
1. Итак, читатель действовал дважды между приращением записи? Как вы думаете, почему это не «работает должным образом»
2. Определите синхронизацию .
get()
просто извлекает текущее значение, которое он видит вAtomicInteger
.3. Проблема не в том, что это
AtomicInteger
не работает. Это так. То, что вы видите, — это простая проблема с переплетением потоков. Ожидание 100 мс не означает, что вы будете планировать потоки в точном порядке. В зависимости от вашей ОС время может иметь худшую степень детализации, чем 1 мс. Даже если это не так, многое может произойти за один мс, и потоки, скорее всего, просто выполняются в одном мс, иногда меняя порядок. Если вы хотите, чтобы они синхронизировались тик-так, вам нужно использоватьCountDownLatch
или что-то подобное.4. Два потока не обязательно выполняются синхронно. Представьте, что 1-й поток имеет преимущество в 150 миллисекунд. Это означало бы, что внутренний цикл 1-го потока (скорее всего) выполняется дважды, прежде чем 2-й поток выполнит свою первую итерацию.
5. Вы также можете видеть, что после дубликатов вы видите пропущенные числа. Это потому, что потоки снова изменили свой порядок, и читатель «пропустил» один удар. Это нормально, добро пожаловать в многопоточность.
Ответ №1:
Два больших вывода из этого заключаются в том, что:
Thread.sleep(100)
не означает «Спящий режим в течение 100 мс, с точностью до наносекунды». Это немного менее точно, зависит от детализации и точности внутренних часов ОС, от планирования собственных потоков, других задач, выполняемых на компьютере. Даже цикл сна-пробуждения занимает некоторое (на удивление большое) количество времени.- Атомизация хороша, когда несколько потоков взаимодействуют независимо. Если вам нужно, чтобы они были как-то зависимыми и / или реагировали на действия других потоков, атомарные — это не то, что вам нужно. Вам нужно будет использовать реальный механизм синхронизации.
Поэтому вы не можете использовать sleep()
и atomics для планирования выполнения двух потоков в идеально сбалансированном цикле тик-так.
Это происходит в вашем коде:
- Поток записи записывает значение 1 в момент времени 100, затем переходит в спящий режим.
- Поток чтения считывает значение 1 в момент времени 100, затем переходит в спящий режим. Они оба могут легко запускаться на одной и той же ms, в многоядерной системе они могут даже запускаться в одно и то же время, и они, вероятно, делают.
- Читатель просыпается во время 200.0 и снова считывает значение 1.
- Writer просыпается во время 200.02 и записывает значение 2. Упс, мы только что получили дубликат.
Обратите внимание, что потоки могут даже откидываться назад, в этом случае вы увидите пропущенное число в последовательности, и иногда вы это делаете. Чтобы сбалансировать потоки для запуска в идеальной схеме A-B-A-B, вы можете сделать, например, что-то вроде этого:
public class AutomicTest {
private volatile boolean isStopped = false;
private final CyclicBarrier barrier = new CyclicBarrier(2);
private int count = 0;
public static void main(String[] args) throws InterruptedException {
AutomicTest test = new AutomicTest();
Thread writerThread = new Thread(() -> {
while (!test.isStopped) {
test.count ;
try {
test.barrier.await();
Thread.sleep(100);
} catch (InterruptedException | BrokenBarrierException ignored) {
Thread.currentThread().interrupt();
break;
}
}
});
Thread readerThread = new Thread(() -> {
while (!test.isStopped) {
try {
test.barrier.await();
System.out.println("Counter: " test.count);
Thread.sleep(100);
} catch (InterruptedException | BrokenBarrierException ignored) {
Thread.currentThread().interrupt();
break;
}
}
});
writerThread.start();
readerThread.start();
Thread.sleep(4000);
test.isStopped = true;
writerThread.join();
readerThread.join();
}
}
Ключевым моментом здесь является CyclicBarrier
то, что:
Средство синхронизации, которое позволяет набору потоков ожидать, пока все другие достигнут общей точки барьера. CyclicBarriers полезны в программах, включающих группу потоков фиксированного размера, которые должны время от времени ожидать друг друга. Барьер называется циклическим, потому что его можно повторно использовать после освобождения ожидающих потоков.
В этом случае барьер настроен на две синхронизированные стороны — писатель и читатель:
- Автор сначала записывает свое значение, затем ожидает, пока все стороны достигнут барьера (другими словами, он ожидает, пока читатель прочитает значение).
- Читатель сначала ждет, пока все стороны достигнут барьера (другими словами, он ждет, пока автор запишет новое значение), только затем считывает значение.
В этой схеме count
видимость значения обеспечивается с помощью CyclicBarrier
, поэтому вам даже не нужен AtomicInteger
here . Более конкретно:
Действия в потоке до вызова
await()
происходят — перед […] действиями после успешного возврата из соответствующегоawait()
в других потоках.
О, и isStopped
также не нужен AtomicBoolean
, volatile
достаточно. Но это будет работать в любом случае. Извините, я понимаю, что это должно было быть задачей для практики atomics, но они не являются хорошим инструментом, если вам нужно, чтобы потоки ждали друг друга.
Примечание: приведенный выше механизм по-прежнему не совсем корректен при удалении sleep()
вызовов. Причина этого в том, что после освобождения считыватель конкурирует с записывающим устройством на следующей итерации цикла. Чтобы исправить это, автор должен дождаться завершения предыдущего чтения, а читатель должен дождаться завершения записи. Этого можно добиться, используя второй барьер или, возможно Phaser
, тот, который я намеренно не использовал в приведенном выше примере, поскольку он более продвинутый, и вам нужно изучить CyclicBarriers и CountDownLatches, прежде чем переходить к фазерам. Также необходимо настроить механизм завершения работы. Удачи!
РЕДАКТИРОВАТЬ: я на самом деле написал решение без sleep()
двойного фазирования и обнаружил, что его намного легче читать (если вас не волнует прерывание длительной задачи, которое вы обычно должны!) и намного быстрее, чем эквивалентное решение CyclicBarrier. Итак, мы оба кое-что узнали сегодня. Вот оно:
public class AutomicTest {
private volatile boolean isStopped = false;
private final Phaser valueWritten = new Phaser(2);
private final Phaser valueRead = new Phaser(2);
private int count = 0;
public static void main(String[] args) throws InterruptedException {
AutomicTest test = new AutomicTest();
Thread writerThread = new Thread(() -> {
while (!test.isStopped) {
// wait for the previous value to be read
test.valueRead.arriveAndAwaitAdvance();
test.count ;
// acknowledge the write
test.valueWritten.arrive();
}
});
Thread readerThread = new Thread(() -> {
while (!test.isStopped) {
// wait for the value to be written
test.valueWritten.arriveAndAwaitAdvance();
System.out.println("Counter: " test.count);
// acknowledge the read
test.valueRead.arrive();
}
});
writerThread.start();
readerThread.start();
test.valueRead.arrive(); // start the writer
Thread.sleep(4000);
test.isStopped = true;
test.valueRead.forceTermination();
test.valueWritten.forceTermination();
writerThread.join();
readerThread.join();
}
}
Комментарии:
1. Большое спасибо, это было отличное исправление и подробное объяснение.
2. @sunleo Добро пожаловать! Я только что добавил решение no-sleep(), потому что узнал, что оно действительно может выглядеть красиво и быть очень быстрым.