(Как) я могу «пропустить сигнал» с помощью этого Concurrentlink queue и sleep()?

#java #multithreading #queue

#java #многопоточность #очередь

Вопрос:

В моем Java-приложении несколько потоков помещают данные в очередь, из которой другой поток (только один) принимает объекты и отправляет их.

Иногда поток-потребитель, похоже, не замечает, что в очередь добавлены новые элементы, поскольку сообщения журнала, указывающие на опрос, перестают появляться. Сообщения журнала из производящих потоков указывают на то, что эти элементы действительно поступают. Погуглив немного, я понял, что это, кажется, известно как «пропущенный сигнал». Поскольку я не wait разрабатываю и не использую блокировки, я не уверен, применимо ли это ко мне.

Что меня действительно озадачивает, так это то, что когда я прерываю поток-потребитель, он обрабатывает все элементы в очереди, а затем снова замолкает, даже не выходя.

Вот основной цикл потока-потребителя, потоки-производители выполняют такие вещи, как чтение из сокетов, и их единственное общее свойство заключается в том, что они используют add() очередь, из которой опрашивается потребитель.

 public class FieldFrontService
{
    // ...

    private ConcurrentLinkedQueue<Transmission> _qRec;

    // ...

    private Runnable _createReceptionist()
    {
        return new Runnable() {
            @Override
            public void run()
            {
                while (!Thread.currentThread().isInterrupted()) {
                    Transmission tx = FieldFrontService.this._qRec.poll();
                    if (null == tx) {
                        try {
                            Thread.sleep(250);
                        } catch (InterruptedException e) {
                            break;
                        }
                    } else {
                        FieldFrontService.this._receiveTransmission(tx);
                    }
                }
            }
        }

    // ...
}
  

Комментарии:

1. Это что-то вроде кэширования? Должен ли _qRec быть изменчивым?

2. Я получил отличные ответы, которые помогают мне в моей конкретной ситуации, спасибо всем. Я чувствую, однако, что в этой теме все еще не хватает некоторого ответа на вопрос, почему приведенный выше код не работает, чтобы быть более полезным в целом. Возможно, это моя вина, я подумаю о том, что я могу сделать, чтобы улучшить ситуацию.

3. Я думаю, что моим основным предложением было бы использовать общую переменную в качестве флага остановки, а не использовать прерывания.

4. Для начала, вы не будете ждать через sleep, если вы действительно не знаете, что делаете. Управление потоком с помощью Thread.interrupt() возможно, не очень легко сделать это правильно. Использование CLQ для того, что вы пытаетесь, великолепно и превосходит любое использование очереди исполнителя / блокировки, когда все сделано правильно для таких случаев, как один потребитель / много производителей.

5. Но у нас здесь нет объяснения причины.

Ответ №1:

Возможно, вам лучше использовать ExecutorService, который объединяет пул потоков и очередь. Также это работает. 😉

 public class FieldFrontService {
    private final ExecutorService executor = Executors.newSingleThreadExecutor();

    void receiveTransmission(final Transmission tx) {
        executor.execute(new Runnable() {
            public void run() {
                FieldFrontService.this._receiveTransmission(tx);
            }
        });
    }
}
  

Комментарии:

1. Мне нравится этот код… Но мы все еще не знаем, почему код Ханно Фица работает не так, как задумывалось.

2. Неплохо. В более старой версии у меня был пул потоков, но это вызвало кучу проблем с параллелизмом без каких-либо преимуществ от параллельной обработки, поэтому я отказался от него. Наличие «пула» только с одним потоком мне даже в голову не приходило. Дух. Тем не менее, Николас прав, я не стал умнее из-за пропущенных вставок.

3. @Hanno, основываясь на вашем примере кода, вы можете захотеть иметь запланированную задачу, которая содержит содержимое вашего цикла, а не более крупную задачу, которая оборачивает цикл while вокруг сути проблемы.

Ответ №2:

Я думаю, что ваше InterruptedException где-то в вашем стеке вызовов поглощается.

Этот шаблон — это то, что меня беспокоит:

 try {                             
    Thread.sleep(250);
} catch (InterruptedException e) {
    break;
} 
  

Если вы используете аналогичный шаблон в своем коде, то состояние прерывания потока было потеряно. Например, если Thread.interrupted() они использовались в вашем стеке вызовов, это сбросит статус прерванного вашего потока. В результате ваш цикл while никогда не завершится: статус был сброшен на false .

Эти две строки вызывают беспокойство:

 Transmission tx = FieldFrontService.this._qRec.poll();
// ....
FieldFrontService.this._receiveTransmission(tx); 
  

Если любой из этих вызовов вызовет Thread.currentThread().interrupted() , вы потеряете isInterrupted логическое значение. Я бы посмотрел на реализацию обоих этих методов на случай, если используется исключение / состояние.

Моя обработка прерываемых исключений отличается. Я предпочитаю повторно перенаправлять прерывание:

 try {                             
    Thread.sleep(250);
} catch (InterruptedException e) {
    Thread.currentThread().interrupt();
} 
  

Таким образом, я гарантирую, что нет никакой возможности, что мое прерывание было потеряно. В принципе, я пытаюсь сказать, нет, серьезно, ОСТАНОВИТЕ поток, ОСТАНОВИТЕ!

Комментарии:

1. Хороший момент в том, что InterruptedException используется, хотя у меня серьезные проблемы с представлением, где это может быть (я ничего не упустил, код взят прямо из моей IDE). Я все же проведу расследование.

2. @Hanno, я добавлю несколько примечаний к ответу, чтобы указать, где я был бы обеспокоен.

Ответ №3:

Не потому ли, что оператор break завершает цикл while, вызывая остановку потока? Что произойдет, если вы удалите оператор break? Или это то, чего вы ожидаете?

Я предполагаю, что поток не видит новые элементы, потому что поток уже остановлен из-за исключения, которого вы не ожидали.

Комментарии:

1. Поток не останавливается. В моей конкретной ситуации это фактически уменьшает ущерб, потому что я могу прервать поток, а затем очередь обрабатывается (sic). Однако это чертовски сбивает меня с толку всякий раз, когда я пытаюсь выяснить, почему, похоже, он ничего не опрашивает из очереди.

2. Я нахожу очень странным, что это никогда не останавливается. Интересно, что произойдет с кодом, если прерывание произойдет из-за блока ожидания. Я думаю, что лучше всего избегать использования прерывания.