java-производитель-потребитель не всегда завершается

#java #producer-consumer

#java — язык #производитель-потребитель #java

Вопрос:

У меня есть система, которая считывает имена из списка, вызывает внешний сервер для проверки статуса true / false и выполняет действия с именами, имеющими статус true. вызов внешнего сервера занимает некоторое время, поэтому выполнение всего этого в одном потоке не очень эффективно.

В настоящее время я пытаюсь реализовать это как систему производитель / потребитель, где множество потоков-потребителей считывают имена из списка, вызывают внешний сервер, помещают допустимые имена в очередь блокировки и имеют единственного потребителя, выбирающего элементы из очереди и применяющего к ним действие. к сожалению, однако, система иногда будет работать до завершения, а в другое время будет зависать на неопределенный срок. Тестовый код выглядит следующим образом

 public class SubscriberTest {
    static Queue<String> subscribed = new ConcurrentLinkedQueue<String>();
    static BlockingQueue<String> valid = new LinkedBlockingQueue<String>(100);
    Random rand = new Random();

    public SubscriberTest(int i) {
        for (int j = 0; j < i; j  ) {
            subscribed.add("I love:"   j);
        }
    }

    public SubscriberTest(Queue<String> subs) {
        subscribed = subs;
    }

    public static void main(String[] args) {
        SubscriberTest fun = new SubscriberTest(10000);
        System.out.println(subscribed.size());
        ExecutorService producers = Executors.newCachedThreadPool();
        ExecutorService consumers = Executors.newSingleThreadExecutor();
        Consumer consumer = fun.new Consumer();
        Producer producer = fun.new Producer();
        while (!subscribed.isEmpty()) {
            producers.execute(producer);
            consumers.execute(consumer);
        }
        producers.shutdown();
        consumers.shutdown();
        System.out.println("finally");
    }

    // take names from subscribed and get status
    class Producer implements Runnable {
        public void run() {

            String x = subscribed.poll();
            System.out.println("Producer: "   x   " "   Thread.currentThread().getName());
            try {
                if (getStatus(x)) {
                    valid.put(x);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        // this is a call to an external server
        private boolean getStatus(String x) {
            return rand.nextBoolean();
        }
    }

    // takes names from valid queue and save them
    class Consumer implements Runnable {

        public void run() {
            try {
                System.out.println("Consumer: "   valid.take()   " "   Thread.currentThread().getName());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }

    }
}
  

Пожалуйста, покажи мне, где я ошибаюсь.

Ответ №1:

 String x = subscribed.poll();
  

Вернет null , если в очереди ничего не доступно, что означает, что вы попытаетесь поместить null в «допустимую» очередь, что вызовет исключение нулевого указателя, и этот конкретный поток завершится. Когда это произойдет со всеми потоками в пуле, приложение зависнет.

Комментарии:

1. В подписанной очереди всегда есть ненулевой материал, поэтому subscribed.poll() всегда должен возвращать допустимую строку (я думаю).

2. нет. существует условие гонки между вашим subscribed.isEmpty() тестом в основном потоке и фактическим производителем, удаляющим элемент (в отдельном потоке). таким образом, вы можете сгенерировать гораздо больше производителей, чем фактических элементов в вашей подписанной очереди.

3. вы правы, jtalbohn. Я однажды видел NPE. Теперь я поставил нулевую проверку перед обработкой subscribed.poll() Однако приложение все еще зависает даже в тех случаях, когда нет NPE.

Ответ №2:

ExecutorService — это пул потоков с очередью задач. Добавление другой очереди просто усложняет и увеличивает вероятность того, что вы сделаете что-то неправильно. Я предлагаю вам просто использовать очередь, которая уже есть.

 public class SubscriberTest {
    public static void main(String[] args) throws InterruptedException {
        final ExecutorService consumers = Executors.newSingleThreadExecutor();
        // middle producer
    final ExecutorService producers = Executors.newFixedThreadPool(
                                   Runtime.getRuntime().availableProcessors());
    // subscribed/original producer.
    for (int i = 0; i < 1000*1000; i  ) {
            final String task = "I love:"   i;
            producers.execute(new MidProducer(task, consumers));
        }

        producers.shutdown();
        producers.awaitTermination(10, TimeUnit.SECONDS);
        consumers.shutdown();
        System.out.println("finally");
    }


    static class MidProducer implements Runnable {
        private final Random rand = new Random();
        private final String task;
        private final ExecutorService consumers;

        public MidProducer(String task, ExecutorService consumers) {
            this.task = task;
            this.consumers = consumers;
        }

        public void run() {
            System.out.println("Producer: "   task   " "   Thread.currentThread().getName());

            if (getStatus(task))
                consumers.execute(new Consumer(task));
        }

        private boolean getStatus(String x) {
            return rand.nextBoolean();
        }

    }

    static class Consumer implements Runnable {
        private final String task;

        private Consumer(String task) {
            this.task = task;
        }

        public void run() {
            System.out.println("Consumer: "   task   " "   Thread.currentThread().getName());
        }
    }
}
  

С принтами

 Producer: I love: 1 pool-2-thread-2
Producer: I love: 3 pool-2-thread-4
Producer: I love: 2 pool-2-thread-3
Producer: I love: 5 pool-2-thread-2
Producer: I love: 7 pool-2-thread-2
Producer: I love: 4 pool-2-thread-5
Producer: I love: 6 pool-2-thread-6
Producer: I love: 8 pool-2-thread-7
Producer: I love: 10 pool-2-thread-2
Producer: I love: 9 pool-2-thread-5
Producer: I love: 11 pool-2-thread-8
Producer: I love: 12 pool-2-thread-9
Producer: I love: 14 pool-2-thread-10
Producer: I love: 13 pool-2-thread-2
Producer: I love: 16 pool-2-thread-10
Producer: I love: 15 pool-2-thread-11
Producer: I love: 17 pool-2-thread-12
Producer: I love: 20 pool-2-thread-14
Producer: I love: 19 pool-2-thread-10
Producer: I love: 18 pool-2-thread-13
Producer: I love: 0 pool-2-thread-1
Producer: I love: 22 pool-2-thread-12
Producer: I love: 21 pool-2-thread-15
Producer: I love: 25 pool-2-thread-3
Producer: I love: 27 pool-2-thread-12
Producer: I love: 26 pool-2-thread-10
Producer: I love: 24 pool-2-thread-15
Producer: I love: 28 pool-2-thread-1
Producer: I love: 23 pool-2-thread-16
Producer: I love: 31 pool-2-thread-11
Producer: I love: 30 pool-2-thread-16
Producer: I love: 32 pool-2-thread-1
Producer: I love: 36 pool-2-thread-3
Consumer: I love: 2 pool-1-thread-1
  

 Consumer: I love: 9975 pool-1-thread-1
Consumer: I love: 9977 pool-1-thread-1
Consumer: I love: 9978 pool-1-thread-1
Consumer: I love: 9979 pool-1-thread-1
Consumer: I love: 9981 pool-1-thread-1
Producer: I love: 9996 pool-2-thread-16
Consumer: I love: 9984 pool-1-thread-1
Consumer: I love: 9985 pool-1-thread-1
Consumer: I love: 9990 pool-1-thread-1
Consumer: I love: 9992 pool-1-thread-1
Producer: I love: 9997 pool-2-thread-16
Consumer: I love: 9994 pool-1-thread-1
Consumer: I love: 9995 pool-1-thread-1
Consumer: I love: 9996 pool-1-thread-1
Producer: I love: 9998 pool-2-thread-16
Producer: I love: 9999 pool-2-thread-16
Consumer: I love: 9997 pool-1-thread-1
Consumer: I love: 9998 pool-1-thread-1
Consumer: I love: 9999 pool-1-thread-1
finally
  

Комментарии:

1. Спасибо, что нашли время взглянуть на код. Я скопировал вставленный код как есть и последовательно получаю RejectedExecutionExecptions.

2. Это потому, что потребители отключаются в основном потоке до того, как производители закончат.

3. Добавлено исправление, позволяющее потребителям не отключаться до завершения работы производителей.

4. При изменении количества задач с 10000 до 100000 (и увеличении времени ожидания) приложение по-прежнему не завершается.

5. использовал фиксированный пул потоков вместо cachedthreadpool и разобрался с ним. Спасибо.