Как синхронизировать одновременную запись и закрытие

#java #concurrency

Вопрос:

У меня есть собственный писатель, как показано ниже:

 public class MyWriter{
public void write(StuffToWrite stuff){
/*
do the write here
*/
}

public void close(){
/*
do the close here
*/
}
}
 

который будет доступен нескольким параллельным потокам, без проблем, если метод записи называется concurrently.
Я хочу убедиться, что метод close никогда не выполняется, если какой-либо поток находится в середине/или только начинает запись и ожидает завершения каждой уже начатой записи. Как это дополнить?

P.S. Одно из предложений-использовать флаг AtomicInteger, который будет увеличен/уменьшен на 2 при записи и увеличен на 1 в закрытом методе, что-то вроде этого:

    public class MyWriter
{

  AtomicInteger counter = new AtomicInterger (0);

  public void write (StuffToWrite stuff)
  {
    counterValue = counter.getAndAdd (2);
    if (counterValue % 2 != 0)
      {
    throw new RuntimeException ("Already closed");
      }
    else
      {
    /*
       do the write here
     */
    counterValue = counter.getAndAdd (-2);
    if (counterValue == 1)
      {
        doClose ();
      }
      }
  }

  public void close ()
  {
    int counterValue = counter.get ();
    if (counterValue % 2 != 0)
      return;           //some thread already closed it
    else if (counterValue > 0)
      {             //eben and > 0, so in middle of write
    counter.getAndAdd (1);
      }
    else
      {             // == 1, so no pending writes and should be closed
    counter.getAndAdd (1);
    doClose ();
      }
  }

  private void doClose ()
  {
    /*
       do the close here
     */
  }

}
 

PS2:
В настоящее время это решение (комбинация блокирующих/неблокирующих алгоритмов) работает для меня:

 import java.util.Arrays;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

public class MyWriter {

    Queue<String> globalQueue = new ConcurrentLinkedQueue<String>();

    AtomicBoolean isClosed1 = new AtomicBoolean(false);
    AtomicBoolean isClosed2 = new AtomicBoolean(false);


    AtomicInteger counter = new AtomicInteger(0);


    public void write(List<String> toBeWrote) {
        for (; ; ) {// a non atomic check-then-modify.
            int old_state = counter.get();
            if (isOdd(old_state)) {
                throw new ClosedGracefullyException("gracefully closed");
            }
            int new_state = old_state   2;
            if (counter.compareAndSet(old_state, new_state)) break;
        }
        doTheWrite(toBeWrote);
        int counterValue = counter.getAndAdd(-2);
        if (counterValue == 1) {
            doClose();
        }
    }

    private boolean isOdd(int old_state) {
        return old_state % 2 != 0;
    }

    private boolean isEven(int old_state) {
        return old_state % 2 == 0;
    }

    private void doTheWrite(List<String> toBeWrote) {
        for (String s : toBeWrote) {
            globalQueue.add(s);
            if (isFullyClosed()) {
                throw new ClosedUnexpectedlyException("this writer closed!!");
            }
        }
    }

    public void synchronized close() {
        int old_state = counter.get();
        if (isOdd(old_state)) {
            return;
        } else if (counter.compareAndSet(0, 1)) {
            doClose();
        } else { //even and > 0, so in middle of write
            counter.getAndAdd(1);
        }
    }

    private void doClose() {
        isClosed1.getAndSet(true);
        isClosed2.getAndSet(true);
    }

    private boolean isFullyClosed() {
        return isClosed1.get() amp;amp; isClosed2.get();
    }

    public List<String> getContents() {
        return Arrays.asList(globalQueue.toArray(new String[0]));
    }
}
 

Я использую этот модульный тест для проверки решения:

  @Test
    public void testConcurrency() throws Exception {

        int numberOfWriters = 10;
        int numberOfClosers = 3;
        final int maxSleepMillis = 10;
        int numberOfRuns = 500;

        long sumDuration = 0L;
        long start = System.currentTimeMillis();
        for (int k = 0; k < numberOfRuns; k  ) {
            myWriter = new MyWriter();
            Thread writers[] = new Thread[numberOfWriters];
            Thread closers[] = new Thread[numberOfClosers];

            final boolean[] exceptionHappened = {false};
            final int[] closedExceptionCount = {0};

            for (int i = 0; i < numberOfWriters; i  ) {
                final int finalI = i;
                writers[i] = new Thread(new Runnable() {
                    public void run() {
                        sleep(maxSleepMillis);
                        try {
                            myWriter.write(Arrays.asList("test"  finalI));
                        } catch (ClosedUnexpectedlyException e){
                            exceptionHappened[0] = true;
                        } catch (ClosedGracefullyException cwe){
                            //OK
                            closedExceptionCount[0]  ;
                        }
                    }
                });
            }

            for (int i = 0; i < numberOfClosers; i  ) {
                final int finalI = i;
                closers[i] = new Thread(new Runnable() {
                    public void run() {
                        sleep(maxSleepMillis);
                        myWriter.close();
                    }
                });
            }

            for (Thread writer : writers) {
                writer.start();
            }

            for (Thread closer : closers) {
                closer.start();
            }

            for (Thread writer : writers) {
                writer.join();
            }

            for (Thread closer : closers) {
                closer.join();
            }
            long avgDuration = (System.currentTimeMillis() - start) / (k 1);
            System.out.println(String.format("run %d, closed Ex: %d , avgDuration: %d" , k, closedExceptionCount[0], avgDuration));
            assertFalse(exceptionHappened[0]);
        }


    }

    private void sleep(int maxSleepMillis) {
        try {
            Thread.sleep((long) (Math.random() * maxSleepMillis));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
 

Комментарии:

1. Ключевая часть заключается в том,что вам нужно вставить его в цикл for, и вы получите что-то вроде: for(true){long state_state = state.get(); long new_state = … ; если(state.compareAndSet(old_state, new_new_state)) сломается;}

2. Ситуация, которую вы хотите предотвратить,-это неатомная проверка, а затем изменение. В противном случае вы окажетесь в состоянии гонки.

3. У вас также есть проблема с написанием. Представьте, что происходит, когда вызывается close; тогда вы увеличите счетчик на 2, даже если вы не входили. Поэтому я бы также сделал это, используя цикл for, как я описал в своем первом комментарии.

4. Извините, но я не понимаю, как именно использовать цикл for в моем коде, а также как проверить условие % 2 с помощью сравнения и установки.

5. для(;;}{long old_state = state.get(); если (old_state нечетное) выбросить исключение(); long new_state = old_state 2; если(state.compare_and_set(old_state;new_state)) прервется;}

Ответ №1:

Проще всего было бы синхронизировать все методы. Это предотвратит любое одновременное закрытие с записью. Это также предотвратит одновременную запись между строк.

Если вы действительно хотите одновременной записи, то просто установите флаг и позвольте авторам проверить этот флаг, как только они выйдут из метода. Таким образом, последняя нить, которая покидает здание, может выключить свет.

Общая идея: вы могли бы закодировать эту информацию в атомарном формате. Например, каждый раз, когда кто-то вводит запись, счетчик увеличивается на 2, и каждый раз, когда поток покидает метод записи, он уменьшает счетчик на 2.

Если поток хочет ввести запись и видит, что счетчик нечетный, он может вернуть или выдать исключение. Все, что тебе заблагорассудится.

Если поток завершает запись, он может уменьшить счетчик на 2, а если остается 1, это означает, что этот поток последним покидает метод записи и может завершить закрытие.

Если поток хочет закрыться, а счетчик четный и больше 0, увеличьте счетчик на 1. Если счетчик уже является нечетным, то кто-то другой уже вызвал метод close, и никаких дальнейших действий не требуется (убедитесь, что вы больше не увеличиваете значение на 1!). Если счетчик был равен нулю, то записи отсутствуют, и вы можете завершить закрытие из вызывающего потока (убедитесь, что вы присвоили значение 1, чтобы предотвратить условия гонки).

Вы можете выполнять эти передачи состояний атомарно, чтобы предотвратить любое состояние гонки с помощью https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/atomic/AtomicLong.html#compareAndSet-long-long-

Комментарии:

1. Должно ли это быть AtomicLong, или AtomicInteger тоже в порядке?

2. Также я не понимаю, как два отдельно синхронизированных метода также будут координировать друг друга? (хотя та часть, которая не позволяет мне одновременно писать, не очень хороша)

3. Это нормально, если у вас нет более 2^30 потоков 🙂

4. Если ваша логика написания действительно параллельна, то вам не нужно иметь никакой другой синхронизации для работы с логикой закрытия. Так что AtomicInteger/Long было бы достаточно.

5. Спасибо за умную общую идею, я ее протестирую, но мне тоже было любопытно узнать о синхронизированных методах

Ответ №2:

 public class MyWriter{
    private final static Lock LOCK = new ReentrantLock();
    public void write(StuffToWrite stuff){
        LOCK.lock();
        /*
        do the write here
        */
    }

    public void close(){
        /*
        do the close here
        */
        LOCK.unlock();
    }
}
 

Комментарии:

1. Это не позволит одновременной записи или многократной записи без закрытия..