#java #concurrency
Вопрос:
У меня есть собственный писатель, как показано ниже:
public class MyWriter{
public void write(StuffToWrite stuff){
/*
do the write here
*/
}
public void close(){
/*
do the close here
*/
}
}
который будет доступен нескольким параллельным потокам, без проблем, если метод записи называется concurrently.
Я хочу убедиться, что метод close никогда не выполняется, если какой-либо поток находится в середине/или только начинает запись и ожидает завершения каждой уже начатой записи. Как это дополнить?
P.S. Одно из предложений-использовать флаг AtomicInteger, который будет увеличен/уменьшен на 2 при записи и увеличен на 1 в закрытом методе, что-то вроде этого:
public class MyWriter
{
AtomicInteger counter = new AtomicInterger (0);
public void write (StuffToWrite stuff)
{
counterValue = counter.getAndAdd (2);
if (counterValue % 2 != 0)
{
throw new RuntimeException ("Already closed");
}
else
{
/*
do the write here
*/
counterValue = counter.getAndAdd (-2);
if (counterValue == 1)
{
doClose ();
}
}
}
public void close ()
{
int counterValue = counter.get ();
if (counterValue % 2 != 0)
return; //some thread already closed it
else if (counterValue > 0)
{ //eben and > 0, so in middle of write
counter.getAndAdd (1);
}
else
{ // == 1, so no pending writes and should be closed
counter.getAndAdd (1);
doClose ();
}
}
private void doClose ()
{
/*
do the close here
*/
}
}
PS2:
В настоящее время это решение (комбинация блокирующих/неблокирующих алгоритмов) работает для меня:
import java.util.Arrays;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
public class MyWriter {
Queue<String> globalQueue = new ConcurrentLinkedQueue<String>();
AtomicBoolean isClosed1 = new AtomicBoolean(false);
AtomicBoolean isClosed2 = new AtomicBoolean(false);
AtomicInteger counter = new AtomicInteger(0);
public void write(List<String> toBeWrote) {
for (; ; ) {// a non atomic check-then-modify.
int old_state = counter.get();
if (isOdd(old_state)) {
throw new ClosedGracefullyException("gracefully closed");
}
int new_state = old_state 2;
if (counter.compareAndSet(old_state, new_state)) break;
}
doTheWrite(toBeWrote);
int counterValue = counter.getAndAdd(-2);
if (counterValue == 1) {
doClose();
}
}
private boolean isOdd(int old_state) {
return old_state % 2 != 0;
}
private boolean isEven(int old_state) {
return old_state % 2 == 0;
}
private void doTheWrite(List<String> toBeWrote) {
for (String s : toBeWrote) {
globalQueue.add(s);
if (isFullyClosed()) {
throw new ClosedUnexpectedlyException("this writer closed!!");
}
}
}
public void synchronized close() {
int old_state = counter.get();
if (isOdd(old_state)) {
return;
} else if (counter.compareAndSet(0, 1)) {
doClose();
} else { //even and > 0, so in middle of write
counter.getAndAdd(1);
}
}
private void doClose() {
isClosed1.getAndSet(true);
isClosed2.getAndSet(true);
}
private boolean isFullyClosed() {
return isClosed1.get() amp;amp; isClosed2.get();
}
public List<String> getContents() {
return Arrays.asList(globalQueue.toArray(new String[0]));
}
}
Я использую этот модульный тест для проверки решения:
@Test
public void testConcurrency() throws Exception {
int numberOfWriters = 10;
int numberOfClosers = 3;
final int maxSleepMillis = 10;
int numberOfRuns = 500;
long sumDuration = 0L;
long start = System.currentTimeMillis();
for (int k = 0; k < numberOfRuns; k ) {
myWriter = new MyWriter();
Thread writers[] = new Thread[numberOfWriters];
Thread closers[] = new Thread[numberOfClosers];
final boolean[] exceptionHappened = {false};
final int[] closedExceptionCount = {0};
for (int i = 0; i < numberOfWriters; i ) {
final int finalI = i;
writers[i] = new Thread(new Runnable() {
public void run() {
sleep(maxSleepMillis);
try {
myWriter.write(Arrays.asList("test" finalI));
} catch (ClosedUnexpectedlyException e){
exceptionHappened[0] = true;
} catch (ClosedGracefullyException cwe){
//OK
closedExceptionCount[0] ;
}
}
});
}
for (int i = 0; i < numberOfClosers; i ) {
final int finalI = i;
closers[i] = new Thread(new Runnable() {
public void run() {
sleep(maxSleepMillis);
myWriter.close();
}
});
}
for (Thread writer : writers) {
writer.start();
}
for (Thread closer : closers) {
closer.start();
}
for (Thread writer : writers) {
writer.join();
}
for (Thread closer : closers) {
closer.join();
}
long avgDuration = (System.currentTimeMillis() - start) / (k 1);
System.out.println(String.format("run %d, closed Ex: %d , avgDuration: %d" , k, closedExceptionCount[0], avgDuration));
assertFalse(exceptionHappened[0]);
}
}
private void sleep(int maxSleepMillis) {
try {
Thread.sleep((long) (Math.random() * maxSleepMillis));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Комментарии:
1. Ключевая часть заключается в том,что вам нужно вставить его в цикл for, и вы получите что-то вроде: for(true){long state_state = state.get(); long new_state = … ; если(state.compareAndSet(old_state, new_new_state)) сломается;}
2. Ситуация, которую вы хотите предотвратить,-это неатомная проверка, а затем изменение. В противном случае вы окажетесь в состоянии гонки.
3. У вас также есть проблема с написанием. Представьте, что происходит, когда вызывается close; тогда вы увеличите счетчик на 2, даже если вы не входили. Поэтому я бы также сделал это, используя цикл for, как я описал в своем первом комментарии.
4. Извините, но я не понимаю, как именно использовать цикл for в моем коде, а также как проверить условие % 2 с помощью сравнения и установки.
5. для(;;}{long old_state = state.get(); если (old_state нечетное) выбросить исключение(); long new_state = old_state 2; если(state.compare_and_set(old_state;new_state)) прервется;}
Ответ №1:
Проще всего было бы синхронизировать все методы. Это предотвратит любое одновременное закрытие с записью. Это также предотвратит одновременную запись между строк.
Если вы действительно хотите одновременной записи, то просто установите флаг и позвольте авторам проверить этот флаг, как только они выйдут из метода. Таким образом, последняя нить, которая покидает здание, может выключить свет.
Общая идея: вы могли бы закодировать эту информацию в атомарном формате. Например, каждый раз, когда кто-то вводит запись, счетчик увеличивается на 2, и каждый раз, когда поток покидает метод записи, он уменьшает счетчик на 2.
Если поток хочет ввести запись и видит, что счетчик нечетный, он может вернуть или выдать исключение. Все, что тебе заблагорассудится.
Если поток завершает запись, он может уменьшить счетчик на 2, а если остается 1, это означает, что этот поток последним покидает метод записи и может завершить закрытие.
Если поток хочет закрыться, а счетчик четный и больше 0, увеличьте счетчик на 1. Если счетчик уже является нечетным, то кто-то другой уже вызвал метод close, и никаких дальнейших действий не требуется (убедитесь, что вы больше не увеличиваете значение на 1!). Если счетчик был равен нулю, то записи отсутствуют, и вы можете завершить закрытие из вызывающего потока (убедитесь, что вы присвоили значение 1, чтобы предотвратить условия гонки).
Вы можете выполнять эти передачи состояний атомарно, чтобы предотвратить любое состояние гонки с помощью https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/atomic/AtomicLong.html#compareAndSet-long-long-
Комментарии:
1. Должно ли это быть AtomicLong, или AtomicInteger тоже в порядке?
2. Также я не понимаю, как два отдельно синхронизированных метода также будут координировать друг друга? (хотя та часть, которая не позволяет мне одновременно писать, не очень хороша)
3. Это нормально, если у вас нет более 2^30 потоков 🙂
4. Если ваша логика написания действительно параллельна, то вам не нужно иметь никакой другой синхронизации для работы с логикой закрытия. Так что AtomicInteger/Long было бы достаточно.
5. Спасибо за умную общую идею, я ее протестирую, но мне тоже было любопытно узнать о синхронизированных методах
Ответ №2:
public class MyWriter{
private final static Lock LOCK = new ReentrantLock();
public void write(StuffToWrite stuff){
LOCK.lock();
/*
do the write here
*/
}
public void close(){
/*
do the close here
*/
LOCK.unlock();
}
}
Комментарии:
1. Это не позволит одновременной записи или многократной записи без закрытия..