Java Многопоточность — Производитель-потребитель с ограничениями

#java #multithreading #producer-consumer #java.util.concurrent

Вопрос:

Я пытаюсь написать решение для производителя/потребителя, но с ограничениями.

Производитель(ы) (их может быть несколько) должен прекратить работу после получения в общей сложности 5 значений, в то время как потребитель(ы) (также может быть несколько потребителей) должен остановиться после того, как все пять значений были потреблены.

Я пытался использовать AtomicInteger для этого:

 public class InternalQueue {
    
    private static final int MAX_QUEUE_SIZE = 10;
    
    private Queue<Integer> queue = new LinkedList<>();
    
    public AtomicInteger itemsProduced = new AtomicInteger(0);
    
    public AtomicInteger itemsConsumed = new AtomicInteger(0);
        
    public synchronized boolean put(int n) {
        if(itemsProduced.get() < MAX_QUEUE_SIZE) {
            queue.add(n);
            itemsProduced.addAndGet(1);
            return true;
        }
        return false;
    }
    
    
}
 

Но проблема в том, как мне сообщить потоку потребителей, когда нужно остановиться ?

Я хочу избежать исключений.

Также может быть несколько производителей и несколько потребителей.

Есть ли лучший способ сделать это ?

Комментарии:

1. Недостаточно ясно, что вы пытаетесь сделать. Если вы хотите ограничить производителей, ограничьте их, а не очередь. Вы также можете ограничить очередь, но тогда массива значений будет достаточно для базовой структуры данных. Чтобы потребитель знал, что ценности больше нет, вы можете вернуть Optional товар при получении.

2. @daniu То, что я пытаюсь сделать, — это просто остановить производителя и потребителя после определенного порога. Производители в общей сложности не должны производить более 5 ценностей, в то время как потребители не должны потреблять более 5 ценностей. Теперь все ясно ?

3. Ну, вы не показали ни производителя, ни потребителя, поэтому я не знаю, что вам сказать, как их «остановить». Логического результата при вводе должно быть достаточно, чтобы он остановился сам по себе, нет? Для потребителя неясно, хотите ли вы проводить различие между «пока нет данных» и «больше данных не будет».

4. @daniu да, для потребителей я должен различать «пока нет данных» и «больше данных не будет». Для производителей, как мне отличить «очередь заполнена» от «Мне больше не нужно производить».

Ответ №1:

Может быть, что-то вроде этой обертки BlockingQueue ?

 public class MyQueue<T> {

    private final BlockingQueue<T> queue;
    private final AtomicInteger freeSpace;


    public MyQueue(int size) {
        this.freeSpace = new AtomicInteger(size);
        this.queue = new LinkedBlockingQueue<>(size);
    }

    public boolean produce(T t) {
        if(isDone()) {
            return false;
        }
        
        freeSpace.decrementAndGet();
        return queue.offer(t);
    }

    public boolean isDone() {
        return freeSpace.get() <= 0;
    }

    public Optional<T> consume() {
        return Optional.ofNullable(queue.poll());
    }
}

 

Потребители могли бы использовать этот isDone метод или могли бы остановиться, когда Optional возвращаемое consume значение пусто. Обратите внимание, что null значения не принимаются

Комментарии:

1. produce вернется false , когда очередь будет заполнена и больше не нужно будет производить никаких товаров

2. Одна проблема. Допустим freeSpace , есть 1 Producer1 и то, и другое, и Producer2 проверьте значение isDone на истинность, они оба продолжаются, вставляются 2 значения, потребитель принимает только одно значение, и одно значение остается в очереди. В любом случае, чтобы избежать этой ошибочной вставки ?

3. Вы согласны, что единственный способ-это поставить замки put и consume ?

Ответ №2:

Вам нужен класс очереди, чтобы отслеживать количество элементов, отправленных в очередь, а также когда очередь пуста или полна. Вам также необходимо поставить запросы производителей в очередь, чтобы доступ к очереди одновременно предоставлялся только одному производителю. Следующий пример Ada демонстрирует это. Решение в Ada простое, потому что Ada ставит в очередь вызовы записей защищенных объектов, гарантируя, что только один производитель или потребитель имеет доступ в любой момент времени.

 with Ada.Text_IO; use Ada.Text_IO;

procedure Main is
   protected Limited_Buffer is
      entry Put(Item : in Integer);
      entry Get(Item : out Integer);
      function Is_New return Boolean;
      function At_Limit return Boolean;
   private
      Count : Natural := 0;
      Value : Integer;
      New_Value : Boolean := False;
   end Limited_Buffer;
   
   protected body Limited_Buffer is
      entry Put(Item : in Integer) when not New_Value and then not At_Limit is
      begin
         Value := Item;
         New_Value := True;
         Count := Count   1;
      end Put;
      entry Get(Item : out Integer) when New_Value is
      begin
         Item := Value;
         New_Value := False;
      end Get;
      function Is_New return boolean is
      begin
         return New_Value;
      end Is_New;
      
      function At_Limit return Boolean is
      begin
         return Count >= 5;
      end At_Limit;
      
   end Limited_Buffer;
   
   task P1;
   
   task body P1 is
      Num : Integer := 1;
   begin
      loop
         select
            Limited_Buffer.Put(Num);
            Num := Num   1;
         or
            delay 0.001;
         end select;
         exit when Limited_Buffer.At_Limit;
      end loop;
   end P1;
   
   task P2;
   
   task body P2 is
      Num : Integer := 10;
   begin
      loop
         select
            Limited_Buffer.Put(Num);
            Num := Num   1;
         or
            delay 0.001;
         end select;
         exit when Limited_Buffer.At_Limit;
      end loop;
   end P2;
   
   task type C1;
   
   task body C1 is
      Num : Integer;
   begin
      loop
         select
         Limited_Buffer.Get(Num);
         Put_Line(Num'Image);
         or
            delay 0.001;
         end select;
         exit when Limited_Buffer.At_Limit;
      end loop;
   end C1;
   
   Con1, Con2 : C1;
             
begin
   null;
end Main;
 

Объекты, защищенные Ada, такие как Limited_Buffer в приведенном выше примере, позволяют определять три вида методов: процедуры, записи и функции. Процедуры обеспечивают исключительную безусловную блокировку чтения-записи на защищаемом объекте. Записи обеспечивают условную блокировку чтения-записи на защищаемом объекте. Функции-это методы, доступные только для чтения, которые обеспечивают общую блокировку чтения для защищенного объекта.
В этом примере используются две записи с именами Put и Get, а также две функции с именами Is_New и At_Limit. Закрытая часть объявления защищенного объекта определяет личные данные, используемые защищенным объектом. В этом примере в защищаемом объекте содержится три элемента данных. Count-это экземпляр Natural, который является предопределенным подтипом целого числа с минимальным значением 0. В этом примере счетчик инициализируется равным 0. Значение-это экземпляр целого числа. Значение-это элемент данных, хранящий значение, записанное в защищенный объект и считанное с него записями Put и Get. New_Value-это экземпляр логического значения, инициализированного значением False. New_Value используется для обеспечения того, чтобы каждое значение, записанное в защищенный объект, использовалось ровно один раз.

Защищенное тело Limited_Buffer реализует логику для двух записей и двух функций. Ada всегда разделяет спецификации и реализации для защищаемых объектов и задач. Первая строка каждой записи имитирует спецификацию защищаемого объекта, но также добавляет граничные условия. Запись оценивается только тогда, когда граничное условие принимает значение TRUE. Каждый вызов задачей записи является очередью. Политика очередей по умолчанию-FIFO, гарантирующая, что вызовы обрабатываются в том порядке, в котором они были сделаны. Вызывающая задача приостанавливается, пока ее вызов находится в очереди на вход. Каждая запись имеет свою собственную очередь.

Далее в примере создаются две задачи производителя с именами P1 и P2. Единственное различие между этими двумя задачами заключается в диапазоне чисел, которые они записывают в очередь. Мы не хотим, чтобы производители были приостановлены в очереди ввода Put навсегда, когда очередь обработает ограничение значений. Ada предоставляет синтаксис для условного вызова очереди ввода. Этот синтаксис начинается с зарезервированного слова «выбрать». После завершения вызова ввода число локальной переменной увеличивается. Условный характер вызова начинается с зарезервированного слова «или». Входящий вызов связан с таймером. Если время истекает до завершения входного вызова, входной вызов отменяется, и цикл повторяется. Цикл завершается, когда Limited_Buffer.At_Limit возвращает значение TRUE.

Эти два потребителя идентичны. Поэтому я создал тип задачи для потребителей. Два экземпляра типа задачи создаются позже в исходном коде. Потребители неоднократно вызывают Limited_Buffer.Получите и распечатайте значение, полученное из буфера. Это действие также выполняется условно, потому что мы не хотим, чтобы потребители были приостановлены после считывания всех значений из буфера.

Наконец, создаются два экземпляра типа задачи-потребителя. Все четыре задачи начинают выполняться, когда выполнение основной процедуры достигает оператора begin в основной процедуре. Основная процедура больше ничего не делает, поэтому ее единственным исполняемым оператором является зарезервированное слово null.

Комментарии:

1. Почему вы используете Ada здесь ? Вопрос касается Java.

2. Почему нет? Список тем включает в себя как многопоточность, так и взаимодействие производителя и потребителя.

3. Проблема заключается в конечных условиях. Поскольку у одного может быть несколько производителей и несколько потребителей, конечным условием для производителя является нахождение буфера на пределе после успешной или неудачной попытки ввода, а конечным условием для потребителей является нахождение буфера на пределе после успешной или неудачной попытки получения. Выражение этих условий может быть сложным.

Ответ №3:

Посмотрите на ArrayBlockingQueue:

Это классический «ограниченный буфер», в котором массив фиксированного размера содержит элементы, вставленные производителями и извлеченные потребителями. После создания емкость не может быть изменена. Попытки поместить элемент в полную очередь приведут к блокировке операции; попытки взять элемент из пустой очереди будут аналогичным образом заблокированы.