#java #concurrency #producer-consumer
Вопрос:
У меня есть 1 производитель и 2 (несколько) потребителя.
Следующий код работает нормально(без условий гонки IMO), но проблема в том, что всякий раз, когда буфер пуст, каждый потребитель выполняет бесконечный цикл, который просто требует ресурсов.
Как я могу его оптимизировать? Я подумываю об использовании чего-то вроде уведомления потребителя в случае, если что-то будет добавлено в буфер, но возникнут проблемы с реализацией.
public class Test {
public static void main(String[] args) {
Assembly assembly = new Assembly(new ArrayList<>(), new ReentrantLock(true));
new Thread(new Runnable() {
@Override
public void run() {
assembly.consume();
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
assembly.produce();
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
assembly.consume();
}
}).start();
}
}
class Assembly {
List<Integer> buffer;
Lock bufferLock;
public Assembly(List<Integer> buffer, Lock bufferLock) {
this.buffer = buffer;
this.bufferLock = bufferLock;
}
public void produce() {
Integer[] nums = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 99};
Random random = new Random();
for (Integer num : nums) {
try {
bufferLock.lock();
buffer.add(num);
if (num != 99) {
System.out.println("Added: " num);
}
} finally {
bufferLock.unlock();
try {
Thread.sleep(random.nextInt(1000));
} catch (InterruptedException e) {
}
}
}
}
public void consume() {
while (true) {
try {
bufferLock.lock();
if (buffer.isEmpty()) {
**IS SOME OPTIMISATION POSSIBLE HERE?**
continue;
}
if (buffer.get(0).equals(99)) {
break;
}
System.out.println(
"Removed: " buffer.remove(0) " by " Thread.currentThread().getName());
} finally {
bufferLock.unlock();
}
}
}
}
Комментарии:
1. Отбросьте
List
и используйте одну из блокирующих очередей, таких какLinkedBlockingQueue
илиArrayBlockingQueue
. И в этих случаях всегда полезно сделать защитную копию любой коллекции, которую вы получаете в качестве параметра.
Ответ №1:
Используйте семафор вместо того, чтобы пытаться запустить свой собственный. Потребители будут ждать, пока «товар» будет доступен, и только один проснется, чтобы получить товар.
Классическое решение «производитель-потребитель» использует два семафора, «другой» используется производителем для ожидания, пока не появится «свободный слот». Без этого производитель может опередить способность потребителей идти в ногу со временем. Ваша программа выглядит так, как будто она имеет небольшой конечный предел производительности, так что это может быть неприменимо здесь.
Ответ №2:
Используйте a Semaphore
для добавления/удаления доступных элементов
public class ProducerConsumer {
List<Integer> buffer = new ArrayList<>();
Semaphore available = new Semaphore(0);
public static void main(String... args) {
ProducerConsumer pc = new ProducerConsumer();
new Thread(pc::consume).start();
new Thread(pc::produce).start();
new Thread(pc::consume).start();
}
public void produce() {
Integer[] nums = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 99};
Random random = new Random();
for (Integer num : nums) {
try {
synchronized (buffer) {
buffer.add(num);
}
available.release();
System.out.println("Added: " num);
} finally {
try {
Thread.sleep(random.nextInt(1000));
} catch (InterruptedException e) {
}
}
}
}
public void consume() {
while (true) {
try {
available.acquire();
synchronized (buffer) {
if (buffer.get(0).equals(99)) {
break;
}
System.out.println(
"Removed: " buffer.remove(0) " by " Thread.currentThread().getName());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
ваши (потребительские) темы будут заблокированы до тех пор, пока не появится новый элемент.
С выходом
Added: 1
Removed: 1 by Thread-0
Added: 2
Removed: 2 by Thread-2
Added: 3
Removed: 3 by Thread-0
...
Комментарии:
1. я изучаю параллелизм java, определенно проверял бы семафоры, но можно ли этого достичь с помощью await() и signal()?
2. повторное ожидание и сигнал. Вы не хотите сигнализировать всем потребителям, вы хотите, чтобы на самом деле проснулся только один. Производитель в настоящее время не знает, какие потребители доступны. Таким образом, вам понадобится очередь из ожидающих потребителей, и потребителю придется самому встать в очередь. К этому моменту вы более или менее реализовали семафор.
3. @Holger правильно, пример показывает только, как не проверять доступность, любая другая структура должна быть проверена на потокобезопасность, но лучший способ зависит от определения производителя/потребителя (в этом случае простое
synchronized (buffer) {
работает, но может быть недостаточно). Я все равно обновился. Примечание: Я предположил, что он заинтересован в том, чтобы знать, как оптимизировать эту строку , а не конвертировать свой код вArrayBlockingQueue
.4. @RajatAggarwal без
synchronized (buffer)
сломан.5. @RajatAggarwal ответ был обновлен с тех пор, как я написал этот комментарий. Поэтому проигнорируйте первую часть моего комментария. Только вторая часть может быть вам интересна, т. е. исходный код
ArrayBlockingQueue
содержит пример того, как решить такую проблему с помощьюReentrantLock
иawait()
иsignal()
.