#java #multithreading #queue
#java #многопоточность #очередь
Вопрос:
В моем Java-приложении несколько потоков помещают данные в очередь, из которой другой поток (только один) принимает объекты и отправляет их.
Иногда поток-потребитель, похоже, не замечает, что в очередь добавлены новые элементы, поскольку сообщения журнала, указывающие на опрос, перестают появляться. Сообщения журнала из производящих потоков указывают на то, что эти элементы действительно поступают. Погуглив немного, я понял, что это, кажется, известно как «пропущенный сигнал». Поскольку я не wait
разрабатываю и не использую блокировки, я не уверен, применимо ли это ко мне.
Что меня действительно озадачивает, так это то, что когда я прерываю поток-потребитель, он обрабатывает все элементы в очереди, а затем снова замолкает, даже не выходя.
Вот основной цикл потока-потребителя, потоки-производители выполняют такие вещи, как чтение из сокетов, и их единственное общее свойство заключается в том, что они используют add()
очередь, из которой опрашивается потребитель.
public class FieldFrontService
{
// ...
private ConcurrentLinkedQueue<Transmission> _qRec;
// ...
private Runnable _createReceptionist()
{
return new Runnable() {
@Override
public void run()
{
while (!Thread.currentThread().isInterrupted()) {
Transmission tx = FieldFrontService.this._qRec.poll();
if (null == tx) {
try {
Thread.sleep(250);
} catch (InterruptedException e) {
break;
}
} else {
FieldFrontService.this._receiveTransmission(tx);
}
}
}
}
// ...
}
Комментарии:
1. Это что-то вроде кэширования? Должен ли _qRec быть изменчивым?
2. Я получил отличные ответы, которые помогают мне в моей конкретной ситуации, спасибо всем. Я чувствую, однако, что в этой теме все еще не хватает некоторого ответа на вопрос, почему приведенный выше код не работает, чтобы быть более полезным в целом. Возможно, это моя вина, я подумаю о том, что я могу сделать, чтобы улучшить ситуацию.
3. Я думаю, что моим основным предложением было бы использовать общую переменную в качестве флага остановки, а не использовать прерывания.
4. Для начала, вы не будете ждать через sleep, если вы действительно не знаете, что делаете. Управление потоком с помощью Thread.interrupt() возможно, не очень легко сделать это правильно. Использование CLQ для того, что вы пытаетесь, великолепно и превосходит любое использование очереди исполнителя / блокировки, когда все сделано правильно для таких случаев, как один потребитель / много производителей.
5. Но у нас здесь нет объяснения причины.
Ответ №1:
Возможно, вам лучше использовать ExecutorService, который объединяет пул потоков и очередь. Также это работает. 😉
public class FieldFrontService {
private final ExecutorService executor = Executors.newSingleThreadExecutor();
void receiveTransmission(final Transmission tx) {
executor.execute(new Runnable() {
public void run() {
FieldFrontService.this._receiveTransmission(tx);
}
});
}
}
Комментарии:
1. Мне нравится этот код… Но мы все еще не знаем, почему код Ханно Фица работает не так, как задумывалось.
2. Неплохо. В более старой версии у меня был пул потоков, но это вызвало кучу проблем с параллелизмом без каких-либо преимуществ от параллельной обработки, поэтому я отказался от него. Наличие «пула» только с одним потоком мне даже в голову не приходило. Дух. Тем не менее, Николас прав, я не стал умнее из-за пропущенных вставок.
3. @Hanno, основываясь на вашем примере кода, вы можете захотеть иметь запланированную задачу, которая содержит содержимое вашего цикла, а не более крупную задачу, которая оборачивает цикл while вокруг сути проблемы.
Ответ №2:
Я думаю, что ваше InterruptedException где-то в вашем стеке вызовов поглощается.
Этот шаблон — это то, что меня беспокоит:
try {
Thread.sleep(250);
} catch (InterruptedException e) {
break;
}
Если вы используете аналогичный шаблон в своем коде, то состояние прерывания потока было потеряно. Например, если Thread.interrupted()
они использовались в вашем стеке вызовов, это сбросит статус прерванного вашего потока. В результате ваш цикл while никогда не завершится: статус был сброшен на false .
Эти две строки вызывают беспокойство:
Transmission tx = FieldFrontService.this._qRec.poll();
// ....
FieldFrontService.this._receiveTransmission(tx);
Если любой из этих вызовов вызовет Thread.currentThread().interrupted()
, вы потеряете isInterrupted
логическое значение. Я бы посмотрел на реализацию обоих этих методов на случай, если используется исключение / состояние.
Моя обработка прерываемых исключений отличается. Я предпочитаю повторно перенаправлять прерывание:
try {
Thread.sleep(250);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
Таким образом, я гарантирую, что нет никакой возможности, что мое прерывание было потеряно. В принципе, я пытаюсь сказать, нет, серьезно, ОСТАНОВИТЕ поток, ОСТАНОВИТЕ!
Комментарии:
1. Хороший момент в том, что InterruptedException используется, хотя у меня серьезные проблемы с представлением, где это может быть (я ничего не упустил, код взят прямо из моей IDE). Я все же проведу расследование.
2. @Hanno, я добавлю несколько примечаний к ответу, чтобы указать, где я был бы обеспокоен.
Ответ №3:
Не потому ли, что оператор break завершает цикл while, вызывая остановку потока? Что произойдет, если вы удалите оператор break? Или это то, чего вы ожидаете?
Я предполагаю, что поток не видит новые элементы, потому что поток уже остановлен из-за исключения, которого вы не ожидали.
Комментарии:
1. Поток не останавливается. В моей конкретной ситуации это фактически уменьшает ущерб, потому что я могу прервать поток, а затем очередь обрабатывается (sic). Однако это чертовски сбивает меня с толку всякий раз, когда я пытаюсь выяснить, почему, похоже, он ничего не опрашивает из очереди.
2. Я нахожу очень странным, что это никогда не останавливается. Интересно, что произойдет с кодом, если прерывание произойдет из-за блока ожидания. Я думаю, что лучше всего избегать использования прерывания.