Выбор структуры данных для варианта проблемы производителя-потребителя

#java #data-structures #producer-consumer

#java #структуры данных #производитель-потребитель

Вопрос:

Прямо сейчас у меня есть очередь с несколькими производителями и одним потребителем.

Работа потока потребителя выполняется медленно. Кроме того, потребитель извлекает элемент из очереди посредством операции просмотра, и пока операция потребления не будет завершена, элемент не может быть удален из очереди. Это потому, что поток производителя в качестве побочной операции также делает снимок всех элементов, которые не полностью обработаны в этот момент времени.

Теперь я хочу изменить свой код для поддержки нескольких потребителей. Итак, допустим, у меня есть три потока, один поток примет первый элемент, который может быть прочитан с помощью операции просмотра. Второй поток-потребитель может перейти ко второму элементу, но у меня нет способа получить это, поскольку queue не поддерживает получение второго элемента.

Итак, возможность использовать стандартный Concurrentlink queue (который я использую прямо сейчас) отсутствует.

Я подумываю об использовании приоритетной очереди, но тогда мне придется связать с каждым элементом флаг, который сообщает мне, используется ли этот элемент уже каким-либо потоком или нет.

Какая структура данных наиболее подходит для этой проблемы?

Ответ №1:

Похоже, у вас действительно должно быть две очереди:

  • Необработанный
  • Выполняется

Потребитель будет атомарно (через блокировку) извлекать из необработанной очереди и добавлять в текущую очередь. Таким образом, несколько потребителей могут работать одновременно… но производитель все еще может сделать снимок обеих очередей, когда это необходимо. Когда потребитель заканчивает с задачей, он удаляет ее из очереди выполнения. (На самом деле это не обязательно должна быть очередь, поскольку из нее ничего не «вытягивается» как таковой. Просто некоторая коллекция, которую вы можете легко добавлять и удалять.)

Учитывая, что вам потребуется блокировка, чтобы сделать передачу атомарной, вам, вероятно, не нужно, чтобы базовые очереди были параллельными — вы уже будете защищать весь общий доступ.

Ответ №2:

Я бы согласился с Джоном Скитом ( 1) в том, что вам нужны два хранилища для записи ожидающих и незавершенных элементов. Я бы использовал LinkedBlockingQueue и попросил каждого из ваших потребителей обращаться take() к нему. Когда элемент поступает в очередь, он будет принят одним из потребителей.

Запись того, что выполняется, и того, что завершено, была бы отдельной операцией. Я бы сохранил HashSet все элементы, которые еще не завершены, и мой производитель сначала (атомарно) добавил бы элемент в хэш-набор незавершенных элементов, а затем поместил элемент в очередь. Как только потребитель завершил свою работу, он удаляет элемент из HashSet.

Ваш производитель может просканировать HashSet, чтобы определить, что является нерешенным.