Совместное использование локальной переменной потока в Java

#java #concurrency #thread-safety #thread-local

#java #параллелизм #потокобезопасность #поток-локальный

Вопрос:

В моем небольшом приложении у меня есть потоки двух классов — в основном writers и readers. Несколько считывающих устройств могут быть назначены одному записывающему устройству. Средство записи взаимодействует со своими читателями путем записи в их chunkBuffer переменную.

Теперь я не могу полностью разобраться в проблеме потокобезопасности здесь: если я не сохраню chunkBuffer в статической ThreadLocal переменной, все читатели будут использовать один chunkBuffer, что плохо. Но если я сохраню chunkBuffer в статическом ThreadLocal , writer, будучи отдельным потоком, получит свою собственную копию chunkBuffer и продолжит запись в него, в то время как ни одна из записываемых им данных не достигнет читателей. Можете ли вы объяснить мне, в чем здесь проблема? Большое вам спасибо.

Редактировать Другими словами, есть ли способ создать поле, которое будет уникальным для каждого отдельного экземпляра подкласса thread (например, ThreadLocal), но может быть доступно из других потоков по запросу?

Комментарии:

1. Почему все устройства чтения не могут совместно использовать один буфер?

2. Потому что читателю может потребоваться удалить некоторые кадры в буфере, в зависимости от скорости соединения между потоком чтения и удаленным клиентом. Таким образом, на самом деле читатели также записывают в буфер в том смысле, что они могут удалять из него содержимое.

3. Я не понимаю, почему ваши записи не доходят до читателей, если буфер записан правильно. На самом деле мне непонятно, почему вы вообще используете ThreadLocal, вы должны быть в состоянии избежать этого.

4. Каков же тогда правильный способ записи в буфер? В настоящее время я вызываю метод addChunk () для чтения из writer, и, естественно, он выполняется в потоке writer, поэтому копия chunkBuffer для чтения остается нетронутой.

5. Немного неясно, что вы пытаетесь сделать, но похоже, что у вас есть средство ввода, записывающее в некоторый буфер, а затем несколько устройств чтения, считывающих этот ввод и иногда удаляющих его. Как очищается буфер? После того, как она была прочитана? Я согласен, что вам, вероятно, не нужны ThreadLocals, но, возможно, если вы немного подробнее объясните, что вы пытаетесь сделать, мы сможем помочь…

Ответ №1:

Нет.

ThreadLocal предназначен для личных данных потока. В вашем случае вам нужны объекты для связи, поэтому вам нужны объекты другого типа.

Я думаю, что наилучшая структура для использования — это синхронизированные очереди: java.util.concurrent.LinkedBlockingQueue<E> .

Очереди позволяют производителю вставлять данные, а потребителю — получать их. Если это синхронизация, это позволяет выполнять это из разных потоков, не нарушая структуру.

Вы можете совместно использовать очередь между связанными авторами / читателями:

 Writer 1 -> queue 1 -> [readers A/B/C]

Writer 2 -> queue 2 -> [readers D/E/F]
  

У каждого автора и читателя будет свой поток. Каждый читатель попытается извлечь один элемент из своей блокировки очереди, если в ней нет элементов. Если вам нужно более разумно управлять своими читателями, вы можете попробовать более сложный подход, но я думаю, что это хорошая отправная точка.

Комментарии:

1. Итак, по сути, моя ошибка заключается в том, что вместо того, чтобы позволить читателям считывать входящие данные в их собственном потоке, я пытался поместить новые данные в их буфер из writer (что, очевидно, не сработало)?

2. Я не уверен, что сказать. Но средство записи выполняется в своем собственном потоке. Читает то же самое. Вам нужен общий объект (sync ‘ed), куда авторы могут «загружать» данные, а читатели могут «извлекать» данные. Я говорил об очередях с дискретными элементами (<E>), потому что у каждого автора есть много читателей. Если бы существовали пары запись-чтение 1 к 1, они могли бы использовать какую-то другую структуру (PipedInputStream PipedOutputStream).

3. Сказал это. Вы могли бы интегрировать обе идеи. Writer -> pipe -> UnifiedReader -> queue -> Readers. Хе-хе. Таким образом, автор может записывать данные произвольного размера. Программа UnifiedReader всегда считывала, скажем, 8k фрагментов и отправляла их в очередь, чтобы позволить читателям обрабатывать их в paralell. Но я захожу слишком далеко, не зная вашего приложения 🙂

Ответ №2:

Я не думаю, что вы хотите использовать локальную переменную ThreadLocal. Вам действительно нужен механизм уведомления.

Writer выполняет следующее:

  1. Запись в chunkBuffer
  2. Уведомлять читателя о новых данных.

Существует множество способов сделать это в Java concurrency, но обычно самый простой и безопасный — использовать ArrayBlockingQueue . В вашем случае использования это звучит так, как будто у каждого читателя будет свой собственный ArrayBlockingQueue , и ему просто нужно прочитать из очереди.

Это, конечно, будет зависеть от вашего варианта использования. Необходимо ли всегда работать с последними данными или вам нужно работать с каждой записью в chunkBuffer ? Такого рода вопросы необходимы для определения того, какой механизм вам нужен.

Примечание: Спасибо @helios за упоминание синхронных очередей, надеюсь, мой ответ добавит ценности.

Комментарии:

1. Как я уже сказал, я не могу напрямую записывать данные в chunkBuffer читателя из writer, поскольку chunkBuffer должен быть локальной переменной потока. Тем не менее, я думаю, что мне действительно следует исправить свое приложение, чтобы оно работало в соответствии с принципами, предложенными @helios.

2. Почему вы говорите, что chunkBuffer должен быть локальной переменной потока? Может ли это быть закрытой переменной в вашем объекте reader и достигать той же цели? Thread-local предназначен для вещей, которые не выходят или не входят в поток, например, гарантируя, что у каждого потока есть свой собственный уникальный случайный объект.