#java #buffer #synchronized #fifo
#java #буфер #синхронизировано #fifo
Вопрос:
Я пытаюсь создать систему, в которой один поток A добавляет элементы в буфер, затем другой поток B отвечает за чтение элементов в точном порядке, в котором они были введены, а затем выполняет с ними некоторые потенциально длительные операции.
Мое лучшее предположение:
Class B extends Thread {
Buffer fifo = BufferUtils.synchronizedBuffer(new BoundedFifoBuffer());
add(Object o) { // Thread A calls me, and doesn't deal well with delays :)
fifo.add(o); // will the sync below prevent this from happening?
// or can .add be independent of the sync ?
}
run() {
synchronized (fifo) { // why am i sync'd here? I am the only thread accessing...
while ( item in buffer ) { // also how do i check this, and block otherwise?
process(fifo.remove());
}
}
|
}
Как вы можете видеть, я даже не совсем уверен, нужна ли синхронизация. Проблема безопасности потоков, с которой я сталкиваюсь, не имеет ничего общего с доступом get(), поскольку к нему будет обращаться только один поток, но что наиболее важно, так это то, что поток A вызывает .add() без какого-либо исключения одновременного доступа во время обработки содержимого буфера потоком B.
Может быть, я слишком много думаю об этом? Безопасно ли это? Ваша оценка этой проблемы очень ценится.
С уважением,
Джей
Ответ №1:
Комментарии:
1. Действительно, интересно, но мой вопрос остается: может ли один поток добавить () в очередь, а другой удалить () ы из очереди без каких-либо проблем с одновременным доступом?
2. С BlockingQueue ответ — да, он потокобезопасен, поэтому вам не нужно беспокоиться о синхронизации. Вы можете прочитать документы download.oracle.com/javase/6/docs/api/java/util/concurrent /… : «Реализации BlockingQueue потокобезопасны …»
3. Это именно то, что мне нужно, спасибо. — LinkedBlockingQueue<E>
4. Спасибо, я, очевидно, этого не осознавал.
Ответ №2:
Если у вас есть поток символов для ведения журнала, самым быстрым подходом может быть использование канала.
PipedOutputStream pos = new PipedOutputStream();
final PipedInputStream pis = new PipedInputStream(pos, 256*1024);
ExecutorService es = Executors.newSingleThreadExecutor();
es.execute(new Runnable() {
@Override
public void run() {
byte[] bytes = new byte[256*1024];
int length;
try {
while ((length = pis.read(bytes)) > 0) {
// something slow.
Thread.sleep(1);
}
} catch (Exception e) {
e.printStackTrace();
}
}
});
// time latency
PrintWriter pw = new PrintWriter(pos);
long start = System.nanoTime();
int runs = 10*1000*1000;
for(int i=0;i<runs;i ) {
pw.println("Hello " i);
}
long time = System.nanoTime() - start;
System.out.printf("Took an average of %,d nano-seconds per line%n", time/runs);
es.shutdown();
С принтами
Took an average of 269 nano-seconds per line
Примечание: сам канал не создает никакого мусора. (В отличие от очереди)
Вы можете использовать ExecutorService для завершения очереди и потоков
ExecutorService es =
es.submit(new Runnable() {
public void run() {
process(o);
}
});
Комментарии:
1. Если я вас правильно понимаю, это было бы неуместно в моем случае, потому что в очереди миллионы элементов, и мне нужен только один рабочий поток для их обработки. все.
2. Не уверен, почему вы хотите, чтобы миллионы элементов были поставлены в очередь. Это звучит довольно неэффективно, но это не имеет никакого значения. У вас могут быть миллионы задач и однопоточный пул.
3. Рабочий поток предназначен для протоколирования вывода символов. Основной поток должен указать, что символ должен быть зарегистрирован, а затем перейти к более неотложной работе. Затем рабочему потоку необходимо прочитать символы из буфера и записать их в файл, потратив на это время, если это необходимо. Итак, чтобы ответить на ваш вопрос, моя очередь — это очередь символов.