#java #multithreading #locking #producer-consumer
#java #многопоточность #блокировка #производитель-потребитель
Вопрос:
Я хочу создавать h2o непрерывно тремя потоками, первый поток будет производить h, второй будет производить h, а третий должен производить o . Как я могу это сделать с помощью lock, производителя-потребителя
package com.threads.reentrantlock.consumerproducer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class H2OProducer {
static Lock lock = new ReentrantLock(true);
static Condition condition = lock.newCondition();
public static void main(String[] args) {
try {
Thread h1 = new Thread(() -> {
try {
hydrogenProducer();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
Thread h2 = new Thread(() -> {
try {
hydrogenProducer();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
Thread o = new Thread(() -> {
try {
hydrogenProducer();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
h1.start();
h2.start();
o.start();
try {
h1.join();
h2.join();
o.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
} catch (Exception e) {
}
}
public static void hydrogenProducer() throws InterruptedException {
try {
lock.lock();
System.out.println("h");
condition.signalAll();
} finally {
lock.unlock();
}
}
public static void oxygenProducer() throws InterruptedException {
try {
lock.lock();
System.out.println("o");
condition.signalAll();
} finally {
lock.unlock();
}
}
}
Что я делаю не так
Исключение в потоке «Thread-2» на java.lang.Исключение IllegalMonitorStateException в java.util.concurrent.блокировки.Блокировка повторного входа $Sync.tryRelease(ReentrantLock.java:151) в java.util.concurrent.блокировки.AbstractQueuedSynchronizer.release (AbstractQueuedSynchronizer.java:1261) в java.util.concurrent.блокировки.Блокировка повторного входа (ReentrantLock.java:457) в com.threads.reentrantlock.consumerproducer.H2OProducer.hydrogenProducer(H2OProducer.java:56) в com.threads.reentrantlock.consumerproducer.H2OProducer.лямбда $ 2 (H2OProducer.java:29) на java.lang.Thread.run(Thread.java:745)
Комментарии:
1. Ваш tryLock может возвращать false, чтобы сказать, что он не получил блокировку, поэтому вы не можете разблокировать его в этом случае.
2. Вы пытаетесь сделать что-то намного, намного более простое, используя только один поток. Все, что вы придумаете, будет сложным и уродливым (и медленнее). Но если вы должны это сделать, я предлагаю использовать состояние, которое определяет, какой поток может выводить следующий.
3. Вы используете
signalAll()
, но ничто не прослушивает этот сигнал.4. Я бы не стал полагаться на справедливые блокировки для обеспечения упорядочения потоков. Даже если бы это сработало, это могло быть легко нарушено кем-то, кто не понимает, почему блокировки должны быть справедливыми. Гораздо лучше сделать это явно.
Ответ №1:
Вы подаете сигнал о выполнении условия, но соответствующего ожидания нет. Кроме того, имеет место опечатка при вызове hydrogenProducer()
из обоих потоков (Thread o
и Thread h
)
Я предполагаю, что вы хотите создать два H
s перед созданием O
. Не имеет значения, создается ли two H
s одним и тем же потоком или двумя разными потоками. Я использовал randomSleep()
для демонстрации этой ситуации.
import java.util.Random;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class H2OProducer {
static final int H2O_COUNT = 1_000;
static final Random rand = new Random();
static final Lock lock = new ReentrantLock(true);
static final Condition oxzWait = lock.newCondition();
static final Condition hydWait = lock.newCondition();
static volatile int hydCount = 0;
public static void main(String[] args) {
try {
Thread h1 = new Thread(() -> {
try {
hydrogenProducer();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
Thread h2 = new Thread(() -> {
try {
hydrogenProducer();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
Thread o = new Thread(() -> {
try {
oxygenProducer();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
h1.setName("H1-Producer");
h2.setName("H2-Producer");
o.setName("Ox-Producer");
h1.start();
h2.start();
o.start();
try {
h1.join();
h2.join();
o.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
} catch (Exception e) {
}
}
public static void hydrogenProducer() throws InterruptedException {
for (int i = 0; i < H2O_COUNT; i ) {
lock.lock();
try {
while (hydCount == 2) {
hydWait.await();
}
hydCount ;
System.out.println(Thread.currentThread().getName() ": H produced - " i);
if (hydCount == 2) {
oxzWait.signalAll();
}
} finally {
lock.unlock();
}
randomSleep();
}
}
public static void oxygenProducer() throws InterruptedException {
for (int i = 0; i < H2O_COUNT; i ) {
lock.lock();
try {
while (hydCount < 2) {
oxzWait.await();
}
hydCount = 0;
System.out.println(Thread.currentThread().getName() ": O produced - " i);
System.out.println("");
hydWait.signalAll();
} finally {
lock.unlock();
}
randomSleep();
}
}
public static void randomSleep() {
int ms = rand.nextInt(500);
try {
Thread.sleep(ms);
} catch (InterruptedException ex) {
}
}
}
Однако, если вы хотите, чтобы каждый H
производитель создавал по одному потоку H
для каждой H2O
композиции, тогда вы можете посмотреть на CyclicBarrier. Вы также можете выполнить объединение потоков в цепочку, если вам нужно поддерживать порядок, например, T1 -> T2 -> T3 -> T1 -> T2 -> T3 ->.
Ответ №2:
Подход, использованный в приведенных выше примерах Java, заставляет производителей водорода и кислорода знать, сколько элементов необходимо произвести, до начала производства. Другой подход к проектированию позволяет централизовать информацию о том, когда следует прекратить производство, путем подсчета конечного количества молекул воды, что соответствует количеству конечных элементов производства на «фабрике воды». В реальной системе управления производством решение о том, когда прекратить производство, должно быть централизованным, а не предоставляться каждому отдельному компоненту в системе.
Следующий пример, написанный на Ada, демонстрирует этот централизованный контроль. Вместо использования сигналов, указывающих на то, что произошло производство водорода или кислорода, это решение фактически передает символические элементы водорода и кислорода от производителей потребителю, который контролирует выполнение и подсчитывает производство конечного элемента.
В решении Ada используется механизм Rendezvous, позволяющий производителям напрямую взаимодействовать с потребителем строго контролируемым образом.
Типы задач производителя определены в пакете с именем Elements. В отличие от Java, Ada обеспечивает разделение интерфейса и реализации. Интерфейс для пакета Elements определяется как:
package Elements is
type Element_Type is (Hydrogen, Oxygen);
task type Hydrogen_Producer is
Entry Stop;
Entry Get_Element(Atom : out Element_Type);
end Hydrogen_Producer;
task type Oxygen_Producer is
Entry Stop;
Entry Get_Element(Atom : out Element_Type);
end Oxygen_Producer;
end Elements;
Определение типа в верхней части спецификации интерфейса Elements определяет тип данных с именем Element_Type с двумя значениями: Водород и кислород. Определены два типа задач: одна для получения водорода, а другая для получения кислорода. Каждый из типов задач содержит две записи. Записи — это механизм, позволяющий одной задаче (или потоку) напрямую взаимодействовать с другой задачей. Остановка ввода сообщает задаче, когда следует прекратить выполнение. Запись Get_Element возвращает экземпляр элемента, созданного задачей.
Механизм рандеву автоматически синхронизирует задачу, вызывающую запись, с вызываемой задачей. Реализация типов задач показывает, как выполняется взаимодействие между задачами.
with Ada.Numerics.Float_Random; use Ada.Numerics.Float_Random;
package body Elements is
Seed : Generator;
-----------------------
-- Hydrogen_Producer --
-----------------------
task body Hydrogen_Producer is
Element : constant Element_Type := Hydrogen;
begin
loop
select
accept Stop;
exit;
or
accept Get_Element(Atom : out Element_Type) do
Atom := Element;
end Get_Element;
end select;
delay Duration(Random(Seed) * 0.1);
end loop;
end Hydrogen_Producer;
---------------------
-- Oxygen_Producer --
---------------------
task body Oxygen_Producer is
Element : constant Element_Type := Oxygen;
begin
loop
select
accept Stop;
exit;
or
accept Get_Element(Atom : out Element_Type) do
Atom := Element;
end Get_Element;
end select;
delay Duration(Random(Seed) * 0.1);
end loop;
end Oxygen_Producer;
begin
reset(Seed);
end Elements;
В теле задачи, где реализованы типы задач, объявляется переменная с именем seed. Начальная переменная является экземпляром генератора типов, определенного в пакете Ada.Numerics.Float_Random. Эта переменная будет содержать начальное значение случайного числа, используемое для генерации случайных задержек для задач производителя. Начальное значение инициализируется в нижней части файла задачи перед началом выполнения любой из задач производителя.
Две задачи точно такие же, за исключением того, что Hydrogen_Producer производит только водород, а Oxygen_Producer — только кислород. Обе задачи содержат бесконечный цикл, который прерывается только при вызове записи Stop. При вызове Stop команда exit завершает цикл. Мы также хотим иметь возможность получать данные от каждого производителя, чтобы эта роль обрабатывалась путем принятия записи Get_Element и передачи созданного элемента. Очевидно, что мы либо получим вызов Stop entry, вызов Get_Element entry, либо вызов no entry. Команда Select позволяет нашей программе обрабатывать либо Stop, либо Get_Element без предпочтения одного или другого. Что происходит, когда ни одна запись не вызывается? Производитель ожидает в блоке выбора вызова одной из записей, таким образом синхронизируя выполнение с вызывающей стороной.
Теперь нам нужен эквивалент метода «main» для создания исполняемой программы. Ada позволяет программисту называть точку входа в программу как угодно. Это не обязательно называть «main».
-----------------------------------------------------------------------
-- H2O production using 2 Hydrogen tasks and 1 Oxygen task
-----------------------------------------------------------------------
with Ada.Text_IO; use Ada.Text_IO;
with Elements; use Elements;
procedure Three_Task_H2O is
H1 : Hydrogen_Producer;
H2 : Hydrogen_Producer;
Oxy : Oxygen_Producer;
New_Atom : Element_Type;
Water_Count : natural := 0;
begin
while Water_Count < 1000 loop
H1.Get_Element(New_Atom);
H2.Get_element(New_Atom);
Oxy.Get_Element(New_Atom);
Water_Count := Water_Count 1;
if Water_Count mod 20 = 0 then
Put_Line("Water Produced:" amp; Water_Count'Image);
end if;
end loop;
H1.Stop;
H2.Stop;
Oxy.Stop;
end Three_Task_H2o;
Процедура Three_Task_H2O создает два экземпляра Hydrogen_Producer с именами H1 и H2. Это также создает экземпляр Oxygen_Producer с именем Oxy. Задачи начинают выполняться немедленно. В Java не найдено эквивалента синтаксиса thread.start.
Tree_Task_H2O выполняет цикл, пока количество молекул воды меньше 1000. Каждая итерация цикла вызывает записи Get_Element для каждого производителя. Что произойдет, если производитель не готов? В конце концов, каждый производитель подвергается случайной задержке при создании своего элемента. Результатом является то, что вызывающая (потребляющая) задача (Three_Task_H2O) приостанавливается до тех пор, пока не будет обработан каждый входной вызов.
Информация о ходе производства воды выводится каждый раз, когда производится еще 20 молекул воды. Когда создается 1000 молекул воды, цикл завершается и вызываются записи Stop для всех трех задач, завершающие каждую задачу по порядку.