Одновременный поиск и удаление элемента из коллекции или ожидание

#java #multithreading

#java #многопоточность

Вопрос:

 public class MyClass {
    private List<Integer> resources = new ArrayList<>();

    public synchronized Integer getAndRemoveResourceOrWait(Integer requestedResource) throws InterruptedException {
        while(resources.stream().anyMatch((r) -> { return r >= requestedResource; })) {
            wait();
        }
        Integer found = resources.stream().findFirst((r) -> {
            return r >= requestedResource;
        }).get();
        resources.remove(found);
        return found;
    }

    public void addResource(Integer resource) {
        resources.add(resource);
        notifyAll();
    }
}
  

Поток «A» эпизодически вызывает addResource со случайным значением.
Несколько других потоков активно вызывают getAndRemoveResourceOrWait.

Что мне нужно сделать, чтобы метод getAndRemoveResourceOrWait работал одновременно?

Например, поток «X» вызывает getAndRemoveResourceOrWait с переменной 128, которая не существует в коллекции ресурсов. Итак, он становится ожидающим этого. Во время ожидания поток «Y» вызывает getAndRemoveResourceOrWait с переменной 64, и она существует в коллекции ресурсов. Поток «Y» не должен ждать завершения потока «X».

Ответ №1:

Что мне нужно сделать, чтобы метод getAndRemoveResourceOrWait работал одновременно?

Его просто нужно запустить в потоке, отличном от того, который вызывает addResource(resource) .

Обратите внимание, что getAndRemoveResource это блокирующая (синхронная) операция в том смысле, что поток, выполняющий вызов, блокируется до тех пор, пока не получит ответ. Однако один вызывающий поток getAndRemoveResource не блокирует вызов другого потока getAndRemoveResource . Ключ в том, что wait() вызов освобождает мьютекс, а затем повторно запрашивает его при получении уведомления о мьютексе. Что здесь произойдет, так это то, что notifyAll вызовет запуск всех ожидающих потоков по одному за раз.

Однако в вашем addResource методе есть ошибка. Метод должен быть объявлен как synchronized . Если вы не вызовете, notifyAll() пока текущий поток содержит мьютекс для on this , вы получите исключение. (И это также необходимо для обеспечения видимости обновлений общего resources объекта… в обоих направлениях.)

Кроме того, эта реализация не будет хорошо масштабироваться:

  • Каждый ожидающий поток будет сканировать весь список ресурсов при каждом обновлении; т.е. при каждом вызове addResource .
  • Когда ожидающий поток находит ресурс, он еще дважды просканирует список, чтобы удалить его.
  • Все это выполняется при удержании мьютекса в совместно используемом MyClass экземпляре … который также блокируется addResource .

ОБНОВИТЬ — Предполагая, что Resource значения уникальны, лучшим решением было бы использовать замену ArrayList на TreeSet . Это должно сработать:

 public class MyClass {
    private TreetSet<Integer> resources = new TreeSet<>();

    public synchronized Integer getAndRemoveResourceOrWait(
            Integer resource) throws InterruptedException {
        while (true) {
            Integer found = resources.tailSet(resource, true).pollFirst();
            if (found != null) {
                return found;
            }
            wait();
        }
    }

    public synchronized void addResource(Integer resource) {
        resources.add(resource);
        notifyAll();
    }
}
  

(Я также пытался ConcurrentSkipListSet , но я не смог найти способ избежать использования мьютекса при добавлении и удалении. Если бы вы пытались удалить равный ресурс, это можно было бы сделать …)