Синхронизированное использование буфера FIFO

#java #buffer #synchronized #fifo

#java #буфер #синхронизировано #fifo

Вопрос:

Я пытаюсь создать систему, в которой один поток A добавляет элементы в буфер, затем другой поток B отвечает за чтение элементов в точном порядке, в котором они были введены, а затем выполняет с ними некоторые потенциально длительные операции.

Мое лучшее предположение:

  Class B extends Thread {

    Buffer fifo = BufferUtils.synchronizedBuffer(new BoundedFifoBuffer());

    add(Object o) { // Thread A calls me, and doesn't deal well with delays :)
      fifo.add(o); // will the sync below prevent this from happening? 
                   // or can .add be independent of the sync ?
    }

    run() {
     synchronized (fifo) { // why am i sync'd here?  I am the only thread accessing...
         while ( item in buffer ) { // also how do i check this, and block otherwise?
            process(fifo.remove());
         }
     }
    |
  }
  

Как вы можете видеть, я даже не совсем уверен, нужна ли синхронизация. Проблема безопасности потоков, с которой я сталкиваюсь, не имеет ничего общего с доступом get(), поскольку к нему будет обращаться только один поток, но что наиболее важно, так это то, что поток A вызывает .add() без какого-либо исключения одновременного доступа во время обработки содержимого буфера потоком B.

Может быть, я слишком много думаю об этом? Безопасно ли это? Ваша оценка этой проблемы очень ценится.

С уважением,

Джей

Ответ №1:

Если я не ошибаюсь, вас также может заинтересовать этот класс ArrayBlockingQueue .

Комментарии:

1. Действительно, интересно, но мой вопрос остается: может ли один поток добавить () в очередь, а другой удалить () ы из очереди без каких-либо проблем с одновременным доступом?

2. С BlockingQueue ответ — да, он потокобезопасен, поэтому вам не нужно беспокоиться о синхронизации. Вы можете прочитать документы download.oracle.com/javase/6/docs/api/java/util/concurrent /… : «Реализации BlockingQueue потокобезопасны …»

3. Это именно то, что мне нужно, спасибо. — LinkedBlockingQueue<E>

4. Спасибо, я, очевидно, этого не осознавал.

Ответ №2:

Если у вас есть поток символов для ведения журнала, самым быстрым подходом может быть использование канала.

     PipedOutputStream pos = new PipedOutputStream();
    final PipedInputStream pis = new PipedInputStream(pos, 256*1024);
    ExecutorService es = Executors.newSingleThreadExecutor();
    es.execute(new Runnable() {
        @Override
        public void run() {
            byte[] bytes = new byte[256*1024];
            int length;
            try {
                while ((length = pis.read(bytes)) > 0) {
                    // something slow.
                    Thread.sleep(1);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    });

    // time latency
    PrintWriter pw = new PrintWriter(pos);
    long start = System.nanoTime();
    int runs = 10*1000*1000;
    for(int i=0;i<runs;i  ) {
        pw.println("Hello " i);
    }
    long time = System.nanoTime() - start;
    System.out.printf("Took an average of %,d nano-seconds per line%n", time/runs);
    es.shutdown();
  

С принтами

     Took an average of 269 nano-seconds per line
  

Примечание: сам канал не создает никакого мусора. (В отличие от очереди)


Вы можете использовать ExecutorService для завершения очереди и потоков

 ExecutorService es =

es.submit(new Runnable() {
  public void run() {
     process(o);
  }
});
  

Комментарии:

1. Если я вас правильно понимаю, это было бы неуместно в моем случае, потому что в очереди миллионы элементов, и мне нужен только один рабочий поток для их обработки. все.

2. Не уверен, почему вы хотите, чтобы миллионы элементов были поставлены в очередь. Это звучит довольно неэффективно, но это не имеет никакого значения. У вас могут быть миллионы задач и однопоточный пул.

3. Рабочий поток предназначен для протоколирования вывода символов. Основной поток должен указать, что символ должен быть зарегистрирован, а затем перейти к более неотложной работе. Затем рабочему потоку необходимо прочитать символы из буфера и записать их в файл, потратив на это время, если это необходимо. Итак, чтобы ответить на ваш вопрос, моя очередь — это очередь символов.