#java
#java
Вопрос:
В распределенной компьютерной системе (не связанной с жестким диском) у меня есть специальный сценарий:
- Операции типа A — могут выполняться параллельно
- Операции типа B — могут выполняться параллельно
- Операции типа A не могут выполняться параллельно с операциями типа B.
- Операции типа B выполняются очень медленно — следовательно, в качестве оптимизации я бы хотел, чтобы они имели более высокий приоритет, но это не является строго необходимым
В качестве быстрого решения я использовал блокировку чтения / записи. Но, поскольку «запись» блокируется всякий раз, когда происходит другая текущая «запись», этого недостаточно.
Я подумал о том, чтобы включить собственную блокировку чтения / записи. Возможно, есть какая-то библиотека, которую я пропустил? Любые другие предложения?
Комментарии:
1. параллельные операции чтения / записи выполняются медленнее, чем выполнение их по одному за раз, ваш жесткий диск имеет только одну головку, и его перемещение больше, чем нужно, только замедлит ваше чтение / запись.
2. Похоже , что вам придется реализовать свой собственный
Lock
. В таком случае начните сAbstractQueuedSynchronizer
урока. Это класс, который используют реализации блокировок и синхронизаторов.3. @RIVERMAN2010 эти операции не связаны с жесткими дисками; оптимизация для «записи» — это просто улучшение, а не обязательное условие. Я отредактирую свой вопрос.
Ответ №1:
import static java.util.concurrent.Executors.*;
import java.util.concurrent.*;
import lombok.*;
import lombok.extern.slf4j.*;
/**
* Based on
* http://tutorials.jenkov.com/java-concurrency/read-write-locks.html#simple
*
* <p>
* Allows for multiple writers.
* </p>
*/
@Slf4j
public class ReadMultiWriteLock {
private static final ReadMultiWriteLock lock = new ReadMultiWriteLock();
private int readers = 0;
private int writers = 0;
/**
* Guards specified critical section for reading purposes.
*
* @param criticalSection
* the critical section
* @throws Throwable
* if this thread was interrupted or any other exception thrown
* from critical section
* @return value returned by critical section
*/
@SneakyThrows
public static <T> T readLocking(final Callable<T> criticalSection) {
lock.readingAquire();
try {
return criticalSection.call();
} finally {
lock.readingRelease();
}
}
/**
* Guards specified critical section for reading purposes.
*
* @param criticalSection
* the critical section
* @throws Throwable
* if this thread was interrupted or any other exception thrown
* from critical section
* @return always {@code null}
*/
public static void readLocking(final Runnable criticalSection) {
readLocking(callable(criticalSection));
}
/**
* Guards specified critical section for writing purposes.
*
* @param criticalSection
* the critical section
* @throws Throwable
* if this thread was interrupted or any other exception thrown
* from critical section
* @return value returned by critical section
*/
@SneakyThrows
public static <T> T writeLocking(final Callable<T> criticalSection) {
lock.writingAcquire();
try {
return criticalSection.call();
} finally {
lock.writingRelease();
}
}
/**
* Guards specified critical section for writing purposes.
*
* @param criticalSection
* the critical section
* @throws Throwable
* if this thread was interrupted or any other exception thrown
* from critical section
* @return always {@code null}
*/
public static void writeLocking(final Runnable criticalSection) {
writeLocking(callable(criticalSection));
}
/**
* Waits for writers to finish and accounts for another reader lock.
*
* @throws InterruptedException
* if this thread was interrupted
*/
public synchronized void readingAquire() throws InterruptedException {
while (writers > 0) {
log.trace("blocking read -- {} writers running", writers);
wait();
}
readers ;
log.trace("aquired {} reading locks", readers);
}
/**
* Accounts for one less reader lock.
*/
public synchronized void readingRelease() {
readers--;
notifyAll();
}
/**
* Waits for readers to finish and accounts for another writer lock.
*
* @throws InterruptedException
* if this thread was interrupted
*/
public synchronized void writingAcquire() throws InterruptedException {
while (readers > 0) {
log.trace("blocking write -- {} readers running", readers);
wait();
}
writers ;
log.trace("aquired {} writing locks", writers);
}
/**
* Accounts for one less writer lock.
*/
public synchronized void writingRelease() throws InterruptedException {
writers--;
notifyAll();
}
}
и тест:
import java.util.concurrent.*;
import org.testng.annotations.*;
import lombok.*;
import lombok.extern.slf4j.*;
@Slf4j
public class ReadMultiWriteLockTest {
private static final class State {
private enum Status {
STARTED, ENDED
}
private static final int MAX_DELAY_MS = 10;
private Status readStatus, writeStatus;
static int randomDelay(final int maxMs) {
return ThreadLocalRandom.current().nextInt(0, maxMs);
}
static boolean randomReadWrite() {
return ThreadLocalRandom.current().nextBoolean();
}
int read(final int id) {
if (Status.STARTED == writeStatus)
throw new IllegalStateException("other thread is writing");
readStatus = Status.STARTED;
log.trace(">>> start reading {}", id);
sleep(randomDelay(MAX_DELAY_MS));
log.trace("<<< end reading {}", id);
readStatus = Status.ENDED;
return id;
}
int write(final int id) {
if (Status.STARTED == readStatus)
throw new IllegalStateException("other thread is reading");
writeStatus = Status.STARTED;
log.trace(">>> start writing {}", id);
sleep(randomDelay(MAX_DELAY_MS));
log.trace("<<< end writing {}", id);
writeStatus = Status.ENDED;
return id;
}
}
private static final ParallelLoop PARALLEL_LOOP = ParallelLoop.INSTANCE
.withParallelism(100)
// NOTE: when running with trace may take long time
.withRepetitions(200_000);
@Test
public void shouldExclusivlyLockForReadsAndWrites() {
val sharedState = new State();
PARALLEL_LOOP
.run(id -> State.randomReadWrite()
? writeLocking(() -> sharedState.write(id))
: readLocking(() -> sharedState.read(id)));
}
@Test(expectedExceptions = IllegalStateException.class)
public void shouldFailIfNeitherLocked() {
val sharedState = new State();
PARALLEL_LOOP
.run(id -> State.randomReadWrite()
? sharedState.write(id)
: sharedState.read(id));
}
@Test(expectedExceptions = IllegalStateException.class)
public void shouldFailIfNotReadLocked() {
val sharedState = new State();
PARALLEL_LOOP
.run(id -> State.randomReadWrite()
? writeLocking(() -> sharedState.write(id))
: sharedState.read(id));
}
@Test(expectedExceptions = IllegalStateException.class)
public void shouldFailIfNotWriteLocked() {
val sharedState = new State();
PARALLEL_LOOP
.run(id -> State.randomReadWrite()
? sharedState.write(id)
: readLocking(() -> sharedState.read(id)));
}
@Test(expectedExceptions = RuntimeException.class)
public void shouldExecuteReadingCriticalSectionWithoutValue() {
readLocking((Runnable) () -> {
throw new RuntimeException("reading critical section executed");
});
}
@Test(expectedExceptions = RuntimeException.class)
public void shouldExecuteWritingCriticalSectionWithoutValue() {
writeLocking((Runnable) () -> {
throw new RuntimeException("writing critical section executed");
});
}
}
и параллельный цикл утилиты:
import java.util.function.*;
import java.util.stream.*;
import lombok.*;
import lombok.extern.slf4j.*;
/**
* Parallel looping with specified threads, repetitions and block of code.
*/
@Slf4j
@AllArgsConstructor
public final class ParallelLoop {
/**
* The default parallel loop; chain with {@link #parallelism} and
* {@link #repetitions} to configure.
*/
public static final ParallelLoop INSTANCE = new ParallelLoop();
@With
private final int parallelism;
@With
private final int repetitions;
/**
* Constructs a default parallel loop with one thread and one repetition.
*/
public ParallelLoop() {
parallelism = 1;
repetitions = 1;
}
/**
* Runs specified function in configured loop.
*
* @param function
* the function to run; is called with the run identifier and
* expected to return it
*/
public void run(final Function<Integer, Integer> function) {
System.setProperty(
"java.util.concurrent.ForkJoinPool.common.parallelism",
String.valueOf(parallelism));
IntStream.range(0, repetitions)
.parallel()
.forEach(id -> log.trace("run id {}", function.apply(id)));
}
/**
* Runs specified consumer in configure loop.
*
* @param consumer
* the consumer to run; is called with the run identifier.
*/
public void run(final IntConsumer consumer) {
run(id -> {
consumer.accept(id);
return id;
});
}
}