блокировка чтения и записи java, позволяющая выполнять несколько операций записи

#java

#java

Вопрос:

В распределенной компьютерной системе (не связанной с жестким диском) у меня есть специальный сценарий:

  1. Операции типа A — могут выполняться параллельно
  2. Операции типа B — могут выполняться параллельно
  3. Операции типа A не могут выполняться параллельно с операциями типа B.
  4. Операции типа B выполняются очень медленно — следовательно, в качестве оптимизации я бы хотел, чтобы они имели более высокий приоритет, но это не является строго необходимым

В качестве быстрого решения я использовал блокировку чтения / записи. Но, поскольку «запись» блокируется всякий раз, когда происходит другая текущая «запись», этого недостаточно.

Я подумал о том, чтобы включить собственную блокировку чтения / записи. Возможно, есть какая-то библиотека, которую я пропустил? Любые другие предложения?

Комментарии:

1. параллельные операции чтения / записи выполняются медленнее, чем выполнение их по одному за раз, ваш жесткий диск имеет только одну головку, и его перемещение больше, чем нужно, только замедлит ваше чтение / запись.

2. Похоже , что вам придется реализовать свой собственный Lock . В таком случае начните с AbstractQueuedSynchronizer урока. Это класс, который используют реализации блокировок и синхронизаторов.

3. @RIVERMAN2010 эти операции не связаны с жесткими дисками; оптимизация для «записи» — это просто улучшение, а не обязательное условие. Я отредактирую свой вопрос.

Ответ №1:

 import static java.util.concurrent.Executors.*;

import java.util.concurrent.*;

import lombok.*;
import lombok.extern.slf4j.*;

/**
 * Based on
 * http://tutorials.jenkov.com/java-concurrency/read-write-locks.html#simple
 * 
 * <p>
 * Allows for multiple writers.
 * </p>
 */
@Slf4j
public class ReadMultiWriteLock {

    private static final ReadMultiWriteLock lock    = new ReadMultiWriteLock();
    private int                             readers = 0;
    private int                             writers = 0;

    /**
     * Guards specified critical section for reading purposes.
     *
     * @param criticalSection
     *            the critical section
     * @throws Throwable
     *             if this thread was interrupted or any other exception thrown
     *             from critical section
     * @return value returned by critical section
     */
    @SneakyThrows
    public static <T> T readLocking(final Callable<T> criticalSection) {
        lock.readingAquire();
        try {
            return criticalSection.call();
        } finally {
            lock.readingRelease();
        }
    }

    /**
     * Guards specified critical section for reading purposes.
     *
     * @param criticalSection
     *            the critical section
     * @throws Throwable
     *             if this thread was interrupted or any other exception thrown
     *             from critical section
     * @return always {@code null}
     */
    public static void readLocking(final Runnable criticalSection) {
        readLocking(callable(criticalSection));
    }

    /**
     * Guards specified critical section for writing purposes.
     *
     * @param criticalSection
     *            the critical section
     * @throws Throwable
     *             if this thread was interrupted or any other exception thrown
     *             from critical section
     * @return value returned by critical section
     */
    @SneakyThrows
    public static <T> T writeLocking(final Callable<T> criticalSection) {
        lock.writingAcquire();
        try {
            return criticalSection.call();
        } finally {
            lock.writingRelease();
        }
    }

    /**
     * Guards specified critical section for writing purposes.
     *
     * @param criticalSection
     *            the critical section
     * @throws Throwable
     *             if this thread was interrupted or any other exception thrown
     *             from critical section
     * @return always {@code null}
     */
    public static void writeLocking(final Runnable criticalSection) {
        writeLocking(callable(criticalSection));
    }

    /**
     * Waits for writers to finish and accounts for another reader lock.
     * 
     * @throws InterruptedException
     *             if this thread was interrupted
     */
    public synchronized void readingAquire() throws InterruptedException {
        while (writers > 0) {
            log.trace("blocking read -- {} writers running", writers);
            wait();
        }
        readers  ;
        log.trace("aquired {} reading locks", readers);
    }

    /**
     * Accounts for one less reader lock.
     */
    public synchronized void readingRelease() {
        readers--;
        notifyAll();
    }

    /**
     * Waits for readers to finish and accounts for another writer lock.
     * 
     * @throws InterruptedException
     *             if this thread was interrupted
     */
    public synchronized void writingAcquire() throws InterruptedException {
        while (readers > 0) {
            log.trace("blocking write -- {} readers running", readers);
            wait();
        }
        writers  ;
        log.trace("aquired {} writing locks", writers);
    }

    /**
     * Accounts for one less writer lock.
     */
    public synchronized void writingRelease() throws InterruptedException {
        writers--;
        notifyAll();
    }
}
 

и тест:

 import java.util.concurrent.*;

import org.testng.annotations.*;

import lombok.*;
import lombok.extern.slf4j.*;

@Slf4j
public class ReadMultiWriteLockTest {
    private static final class State {
        private enum Status {
            STARTED, ENDED
        }

        private static final int MAX_DELAY_MS = 10;

        private Status           readStatus, writeStatus;

        static int randomDelay(final int maxMs) {
            return ThreadLocalRandom.current().nextInt(0, maxMs);
        }

        static boolean randomReadWrite() {
            return ThreadLocalRandom.current().nextBoolean();
        }

        int read(final int id) {
            if (Status.STARTED == writeStatus)
                throw new IllegalStateException("other thread is writing");
            readStatus = Status.STARTED;
            log.trace(">>> start reading {}", id);
            sleep(randomDelay(MAX_DELAY_MS));
            log.trace("<<< end reading {}", id);
            readStatus = Status.ENDED;
            return id;
        }

        int write(final int id) {
            if (Status.STARTED == readStatus)
                throw new IllegalStateException("other thread is reading");
            writeStatus = Status.STARTED;
            log.trace(">>> start writing {}", id);
            sleep(randomDelay(MAX_DELAY_MS));
            log.trace("<<< end writing {}", id);
            writeStatus = Status.ENDED;
            return id;
        }
    }

    private static final ParallelLoop PARALLEL_LOOP = ParallelLoop.INSTANCE
        .withParallelism(100)
        // NOTE: when running with trace may take long time
        .withRepetitions(200_000);

    @Test
    public void shouldExclusivlyLockForReadsAndWrites() {
        val sharedState = new State();
        PARALLEL_LOOP
            .run(id -> State.randomReadWrite()
                ? writeLocking(() -> sharedState.write(id))
                : readLocking(() -> sharedState.read(id)));
    }

    @Test(expectedExceptions = IllegalStateException.class)
    public void shouldFailIfNeitherLocked() {
        val sharedState = new State();
        PARALLEL_LOOP
            .run(id -> State.randomReadWrite()
                ? sharedState.write(id)
                : sharedState.read(id));
    }

    @Test(expectedExceptions = IllegalStateException.class)
    public void shouldFailIfNotReadLocked() {
        val sharedState = new State();
        PARALLEL_LOOP
            .run(id -> State.randomReadWrite()
                ? writeLocking(() -> sharedState.write(id))
                : sharedState.read(id));
    }

    @Test(expectedExceptions = IllegalStateException.class)
    public void shouldFailIfNotWriteLocked() {
        val sharedState = new State();
        PARALLEL_LOOP
            .run(id -> State.randomReadWrite()
                ? sharedState.write(id)
                : readLocking(() -> sharedState.read(id)));
    }

    @Test(expectedExceptions = RuntimeException.class)
    public void shouldExecuteReadingCriticalSectionWithoutValue() {
        readLocking((Runnable) () -> {
            throw new RuntimeException("reading critical section executed");
        });
    }

    @Test(expectedExceptions = RuntimeException.class)
    public void shouldExecuteWritingCriticalSectionWithoutValue() {
        writeLocking((Runnable) () -> {
            throw new RuntimeException("writing critical section executed");
        });
    }
}
 

и параллельный цикл утилиты:

 import java.util.function.*;
import java.util.stream.*;

import lombok.*;
import lombok.extern.slf4j.*;

/**
 * Parallel looping with specified threads, repetitions and block of code.
 */
@Slf4j
@AllArgsConstructor
public final class ParallelLoop {
    /**
     * The default parallel loop; chain with {@link #parallelism} and
     * {@link #repetitions} to configure.
     */
    public static final ParallelLoop INSTANCE = new ParallelLoop();
    @With
    private final int                parallelism;
    @With
    private final int                repetitions;

    /**
     * Constructs a default parallel loop with one thread and one repetition.
     */
    public ParallelLoop() {
        parallelism = 1;
        repetitions = 1;
    }

    /**
     * Runs specified function in configured loop.
     * 
     * @param function
     *            the function to run; is called with the run identifier and
     *            expected to return it
     */
    public void run(final Function<Integer, Integer> function) {
        System.setProperty(
            "java.util.concurrent.ForkJoinPool.common.parallelism",
            String.valueOf(parallelism));
        IntStream.range(0, repetitions)
            .parallel()
            .forEach(id -> log.trace("run id {}", function.apply(id)));
    }

    /**
     * Runs specified consumer in configure loop.
     * 
     * @param consumer
     *            the consumer to run; is called with the run identifier.
     */
    public void run(final IntConsumer consumer) {
        run(id -> {
            consumer.accept(id);
            return id;
        });
    }
}