#java #data-structures #producer-consumer
#java #структуры данных #производитель-потребитель
Вопрос:
Прямо сейчас у меня есть очередь с несколькими производителями и одним потребителем.
Работа потока потребителя выполняется медленно. Кроме того, потребитель извлекает элемент из очереди посредством операции просмотра, и пока операция потребления не будет завершена, элемент не может быть удален из очереди. Это потому, что поток производителя в качестве побочной операции также делает снимок всех элементов, которые не полностью обработаны в этот момент времени.
Теперь я хочу изменить свой код для поддержки нескольких потребителей. Итак, допустим, у меня есть три потока, один поток примет первый элемент, который может быть прочитан с помощью операции просмотра. Второй поток-потребитель может перейти ко второму элементу, но у меня нет способа получить это, поскольку queue не поддерживает получение второго элемента.
Итак, возможность использовать стандартный Concurrentlink queue (который я использую прямо сейчас) отсутствует.
Я подумываю об использовании приоритетной очереди, но тогда мне придется связать с каждым элементом флаг, который сообщает мне, используется ли этот элемент уже каким-либо потоком или нет.
Какая структура данных наиболее подходит для этой проблемы?
Ответ №1:
Похоже, у вас действительно должно быть две очереди:
- Необработанный
- Выполняется
Потребитель будет атомарно (через блокировку) извлекать из необработанной очереди и добавлять в текущую очередь. Таким образом, несколько потребителей могут работать одновременно… но производитель все еще может сделать снимок обеих очередей, когда это необходимо. Когда потребитель заканчивает с задачей, он удаляет ее из очереди выполнения. (На самом деле это не обязательно должна быть очередь, поскольку из нее ничего не «вытягивается» как таковой. Просто некоторая коллекция, которую вы можете легко добавлять и удалять.)
Учитывая, что вам потребуется блокировка, чтобы сделать передачу атомарной, вам, вероятно, не нужно, чтобы базовые очереди были параллельными — вы уже будете защищать весь общий доступ.
Ответ №2:
Я бы согласился с Джоном Скитом ( 1) в том, что вам нужны два хранилища для записи ожидающих и незавершенных элементов. Я бы использовал LinkedBlockingQueue
и попросил каждого из ваших потребителей обращаться take()
к нему. Когда элемент поступает в очередь, он будет принят одним из потребителей.
Запись того, что выполняется, и того, что завершено, была бы отдельной операцией. Я бы сохранил HashSet
все элементы, которые еще не завершены, и мой производитель сначала (атомарно) добавил бы элемент в хэш-набор незавершенных элементов, а затем поместил элемент в очередь. Как только потребитель завершил свою работу, он удаляет элемент из HashSet.
Ваш производитель может просканировать HashSet, чтобы определить, что является нерешенным.