как я могу оптимизировать следующий код для проблемы производителя-потребителя

#java #concurrency #producer-consumer

Вопрос:

У меня есть 1 производитель и 2 (несколько) потребителя.

Следующий код работает нормально(без условий гонки IMO), но проблема в том, что всякий раз, когда буфер пуст, каждый потребитель выполняет бесконечный цикл, который просто требует ресурсов.

Как я могу его оптимизировать? Я подумываю об использовании чего-то вроде уведомления потребителя в случае, если что-то будет добавлено в буфер, но возникнут проблемы с реализацией.

 public class Test {

  public static void main(String[] args) {
    Assembly assembly = new Assembly(new ArrayList<>(), new ReentrantLock(true));

    new Thread(new Runnable() {
      @Override
      public void run() {
        assembly.consume();
      }
    }).start();

    new Thread(new Runnable() {
      @Override
      public void run() {
        assembly.produce();
      }
    }).start();

    new Thread(new Runnable() {
      @Override
      public void run() {
        assembly.consume();
      }
    }).start();
  }
}

class Assembly {

  List<Integer> buffer;
  Lock bufferLock;

  public Assembly(List<Integer> buffer, Lock bufferLock) {
    this.buffer = buffer;
    this.bufferLock = bufferLock;
  }

  public void produce() {
    Integer[] nums = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 99};
    Random random = new Random();
    for (Integer num : nums) {
      try {
        bufferLock.lock();
        buffer.add(num);
        if (num != 99) {
          System.out.println("Added: "   num);
        }
      } finally {
        bufferLock.unlock();
        try {
          Thread.sleep(random.nextInt(1000));
        } catch (InterruptedException e) {
        }
      }
    }
  }

  public void consume() {
    while (true) {
      try {
        bufferLock.lock();
        if (buffer.isEmpty()) {
          **IS SOME OPTIMISATION POSSIBLE HERE?**
          continue;
        }
        if (buffer.get(0).equals(99)) {
          break;
        }
        System.out.println(
            "Removed: "   buffer.remove(0)   " by "   Thread.currentThread().getName());
      } finally {
        bufferLock.unlock();
      }
    }
  }
}
 

Комментарии:

1. Отбросьте List и используйте одну из блокирующих очередей, таких как LinkedBlockingQueue или ArrayBlockingQueue . И в этих случаях всегда полезно сделать защитную копию любой коллекции, которую вы получаете в качестве параметра.

Ответ №1:

Используйте семафор вместо того, чтобы пытаться запустить свой собственный. Потребители будут ждать, пока «товар» будет доступен, и только один проснется, чтобы получить товар.

Классическое решение «производитель-потребитель» использует два семафора, «другой» используется производителем для ожидания, пока не появится «свободный слот». Без этого производитель может опередить способность потребителей идти в ногу со временем. Ваша программа выглядит так, как будто она имеет небольшой конечный предел производительности, так что это может быть неприменимо здесь.

Ответ №2:

Используйте a Semaphore для добавления/удаления доступных элементов

 public class ProducerConsumer {
    List<Integer> buffer = new ArrayList<>();
    Semaphore available = new Semaphore(0);

    public static void main(String... args) {
        ProducerConsumer pc = new ProducerConsumer();
        new Thread(pc::consume).start();
        new Thread(pc::produce).start();
        new Thread(pc::consume).start();
    }

    public void produce() {
        Integer[] nums = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 99};
        Random random = new Random();
        for (Integer num : nums) {
            try {
                synchronized (buffer) {
                    buffer.add(num);
                }
                available.release();
                System.out.println("Added: "   num);
            } finally {
                try {
                    Thread.sleep(random.nextInt(1000));
                } catch (InterruptedException e) {
                }
            }
        }
    }

    public void consume() {
        while (true) {
            try {
                available.acquire();
                synchronized (buffer) {
                    if (buffer.get(0).equals(99)) {
                        break;
                    }
                    System.out.println(
                            "Removed: "   buffer.remove(0)   " by "   Thread.currentThread().getName());
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
 

ваши (потребительские) темы будут заблокированы до тех пор, пока не появится новый элемент.

С выходом

 Added: 1
Removed: 1 by Thread-0
Added: 2
Removed: 2 by Thread-2
Added: 3
Removed: 3 by Thread-0
...
 

Комментарии:

1. я изучаю параллелизм java, определенно проверял бы семафоры, но можно ли этого достичь с помощью await() и signal()?

2. повторное ожидание и сигнал. Вы не хотите сигнализировать всем потребителям, вы хотите, чтобы на самом деле проснулся только один. Производитель в настоящее время не знает, какие потребители доступны. Таким образом, вам понадобится очередь из ожидающих потребителей, и потребителю придется самому встать в очередь. К этому моменту вы более или менее реализовали семафор.

3. @Holger правильно, пример показывает только, как не проверять доступность, любая другая структура должна быть проверена на потокобезопасность, но лучший способ зависит от определения производителя/потребителя (в этом случае простое synchronized (buffer) { работает, но может быть недостаточно). Я все равно обновился. Примечание: Я предположил, что он заинтересован в том, чтобы знать, как оптимизировать эту строку , а не конвертировать свой код в ArrayBlockingQueue .

4. @RajatAggarwal без synchronized (buffer) сломан.

5. @RajatAggarwal ответ был обновлен с тех пор, как я написал этот комментарий. Поэтому проигнорируйте первую часть моего комментария. Только вторая часть может быть вам интересна, т. е. исходный код ArrayBlockingQueue содержит пример того, как решить такую проблему с помощью ReentrantLock и await() и signal() .