#java #producer-consumer
#java — язык #производитель-потребитель #java
Вопрос:
У меня есть система, которая считывает имена из списка, вызывает внешний сервер для проверки статуса true / false и выполняет действия с именами, имеющими статус true. вызов внешнего сервера занимает некоторое время, поэтому выполнение всего этого в одном потоке не очень эффективно.
В настоящее время я пытаюсь реализовать это как систему производитель / потребитель, где множество потоков-потребителей считывают имена из списка, вызывают внешний сервер, помещают допустимые имена в очередь блокировки и имеют единственного потребителя, выбирающего элементы из очереди и применяющего к ним действие. к сожалению, однако, система иногда будет работать до завершения, а в другое время будет зависать на неопределенный срок. Тестовый код выглядит следующим образом
public class SubscriberTest {
static Queue<String> subscribed = new ConcurrentLinkedQueue<String>();
static BlockingQueue<String> valid = new LinkedBlockingQueue<String>(100);
Random rand = new Random();
public SubscriberTest(int i) {
for (int j = 0; j < i; j ) {
subscribed.add("I love:" j);
}
}
public SubscriberTest(Queue<String> subs) {
subscribed = subs;
}
public static void main(String[] args) {
SubscriberTest fun = new SubscriberTest(10000);
System.out.println(subscribed.size());
ExecutorService producers = Executors.newCachedThreadPool();
ExecutorService consumers = Executors.newSingleThreadExecutor();
Consumer consumer = fun.new Consumer();
Producer producer = fun.new Producer();
while (!subscribed.isEmpty()) {
producers.execute(producer);
consumers.execute(consumer);
}
producers.shutdown();
consumers.shutdown();
System.out.println("finally");
}
// take names from subscribed and get status
class Producer implements Runnable {
public void run() {
String x = subscribed.poll();
System.out.println("Producer: " x " " Thread.currentThread().getName());
try {
if (getStatus(x)) {
valid.put(x);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// this is a call to an external server
private boolean getStatus(String x) {
return rand.nextBoolean();
}
}
// takes names from valid queue and save them
class Consumer implements Runnable {
public void run() {
try {
System.out.println("Consumer: " valid.take() " " Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
Пожалуйста, покажи мне, где я ошибаюсь.
Ответ №1:
String x = subscribed.poll();
Вернет null
, если в очереди ничего не доступно, что означает, что вы попытаетесь поместить null
в «допустимую» очередь, что вызовет исключение нулевого указателя, и этот конкретный поток завершится. Когда это произойдет со всеми потоками в пуле, приложение зависнет.
Комментарии:
1. В подписанной очереди всегда есть ненулевой материал, поэтому subscribed.poll() всегда должен возвращать допустимую строку (я думаю).
2. нет. существует условие гонки между вашим
subscribed.isEmpty()
тестом в основном потоке и фактическим производителем, удаляющим элемент (в отдельном потоке). таким образом, вы можете сгенерировать гораздо больше производителей, чем фактических элементов в вашей подписанной очереди.3. вы правы, jtalbohn. Я однажды видел NPE. Теперь я поставил нулевую проверку перед обработкой
subscribed.poll()
Однако приложение все еще зависает даже в тех случаях, когда нет NPE.
Ответ №2:
ExecutorService — это пул потоков с очередью задач. Добавление другой очереди просто усложняет и увеличивает вероятность того, что вы сделаете что-то неправильно. Я предлагаю вам просто использовать очередь, которая уже есть.
public class SubscriberTest {
public static void main(String[] args) throws InterruptedException {
final ExecutorService consumers = Executors.newSingleThreadExecutor();
// middle producer
final ExecutorService producers = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors());
// subscribed/original producer.
for (int i = 0; i < 1000*1000; i ) {
final String task = "I love:" i;
producers.execute(new MidProducer(task, consumers));
}
producers.shutdown();
producers.awaitTermination(10, TimeUnit.SECONDS);
consumers.shutdown();
System.out.println("finally");
}
static class MidProducer implements Runnable {
private final Random rand = new Random();
private final String task;
private final ExecutorService consumers;
public MidProducer(String task, ExecutorService consumers) {
this.task = task;
this.consumers = consumers;
}
public void run() {
System.out.println("Producer: " task " " Thread.currentThread().getName());
if (getStatus(task))
consumers.execute(new Consumer(task));
}
private boolean getStatus(String x) {
return rand.nextBoolean();
}
}
static class Consumer implements Runnable {
private final String task;
private Consumer(String task) {
this.task = task;
}
public void run() {
System.out.println("Consumer: " task " " Thread.currentThread().getName());
}
}
}
С принтами
Producer: I love: 1 pool-2-thread-2
Producer: I love: 3 pool-2-thread-4
Producer: I love: 2 pool-2-thread-3
Producer: I love: 5 pool-2-thread-2
Producer: I love: 7 pool-2-thread-2
Producer: I love: 4 pool-2-thread-5
Producer: I love: 6 pool-2-thread-6
Producer: I love: 8 pool-2-thread-7
Producer: I love: 10 pool-2-thread-2
Producer: I love: 9 pool-2-thread-5
Producer: I love: 11 pool-2-thread-8
Producer: I love: 12 pool-2-thread-9
Producer: I love: 14 pool-2-thread-10
Producer: I love: 13 pool-2-thread-2
Producer: I love: 16 pool-2-thread-10
Producer: I love: 15 pool-2-thread-11
Producer: I love: 17 pool-2-thread-12
Producer: I love: 20 pool-2-thread-14
Producer: I love: 19 pool-2-thread-10
Producer: I love: 18 pool-2-thread-13
Producer: I love: 0 pool-2-thread-1
Producer: I love: 22 pool-2-thread-12
Producer: I love: 21 pool-2-thread-15
Producer: I love: 25 pool-2-thread-3
Producer: I love: 27 pool-2-thread-12
Producer: I love: 26 pool-2-thread-10
Producer: I love: 24 pool-2-thread-15
Producer: I love: 28 pool-2-thread-1
Producer: I love: 23 pool-2-thread-16
Producer: I love: 31 pool-2-thread-11
Producer: I love: 30 pool-2-thread-16
Producer: I love: 32 pool-2-thread-1
Producer: I love: 36 pool-2-thread-3
Consumer: I love: 2 pool-1-thread-1
…
Consumer: I love: 9975 pool-1-thread-1
Consumer: I love: 9977 pool-1-thread-1
Consumer: I love: 9978 pool-1-thread-1
Consumer: I love: 9979 pool-1-thread-1
Consumer: I love: 9981 pool-1-thread-1
Producer: I love: 9996 pool-2-thread-16
Consumer: I love: 9984 pool-1-thread-1
Consumer: I love: 9985 pool-1-thread-1
Consumer: I love: 9990 pool-1-thread-1
Consumer: I love: 9992 pool-1-thread-1
Producer: I love: 9997 pool-2-thread-16
Consumer: I love: 9994 pool-1-thread-1
Consumer: I love: 9995 pool-1-thread-1
Consumer: I love: 9996 pool-1-thread-1
Producer: I love: 9998 pool-2-thread-16
Producer: I love: 9999 pool-2-thread-16
Consumer: I love: 9997 pool-1-thread-1
Consumer: I love: 9998 pool-1-thread-1
Consumer: I love: 9999 pool-1-thread-1
finally
Комментарии:
1. Спасибо, что нашли время взглянуть на код. Я скопировал вставленный код как есть и последовательно получаю RejectedExecutionExecptions.
2. Это потому, что потребители отключаются в основном потоке до того, как производители закончат.
3. Добавлено исправление, позволяющее потребителям не отключаться до завершения работы производителей.
4. При изменении количества задач с 10000 до 100000 (и увеличении времени ожидания) приложение по-прежнему не завершается.
5. использовал фиксированный пул потоков вместо cachedthreadpool и разобрался с ним. Спасибо.