ArrayBlockingQueue с пружинной загрузкой

#spring-boot #java-8

Вопрос:

Как я могу использовать ArrayBlockingQueue с Spring Boot ? Я настроил очередь, как показано ниже —

 @Configuration
public class MessageQueueConfig {

 @Bean
 public ArrayBlockingQueue arrayBlockingQueue() {
    ArrayBlockingQueue<Message> arrayBlockingQueue = new ArrayBlockingQueue<>(50000);
    return arrayBlockingQueue;
 }
}
 

При вызове api я предлагаю данные в очередь

 @Autowired
private MessageQueueConfig queueConfig;

...
queueConfig.arrayBlockingQueue().offer(message, 5, TimeUnit.SECONDS)
 

Для poll сообщения из очереди нужно ли использовать потоковую передачу? или как я могу poll отправить сообщение из очереди? Может напрямую использоваться @Autowired для очереди в потребителе и poll сообщения в потребителе

Комментарии:

1. Я не знаю, какая магия вводится из-за этих аннотаций, но для простой Java метод, который возвращает новую очередь при каждом вызове, был бы бессмысленным.

Ответ №1:

Прямой ответ-Нет.

ArrayBlockingQueue-это реализация интерфейса BlockingQueue, в котором реализованы блокировки.

Для справки, я публикую ниже реализацию исходного кода для методов poll() и take() ArrayBlockingQueue.

      public E poll() {
    final ReentrantLock lock = this.lock;
     lock.lock();
     try {
         if (count == 0)
             return null;
         E x = extract();
         return x;
     } finally {
         lock.unlock();
     }
}

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
       try {
            while (count == 0)
                notEmpty.await();
        } catch (InterruptedException ie) {
             notEmpty.signal(); // propagate to non-interrupted thread
            throw ie;
        }
        E x = extract();
         return x;
    } finally {
        lock.unlock();
     }
}
 

Ответ №2:

Чтобы опросить сообщение из очереди, нужно ли использовать потоковую передачу?

Согласно документации, BlockingQueue реализации являются потокобезопасными. При необходимости вы можете использовать потоки.

Можно напрямую использовать @Autowired для очереди в потребителе и опросить сообщение в потребителе

В потребителе сделайте что-нибудь подобное:

 @Autowired
private BlockingQueue<Message> arrayBlockingQueue;

//...
arrayBlockingQueue.poll(5, TimeUnit.SECONDS)