Параллельная очередь, похожая на тему, в обычной Java

#java #concurrency #queue

#java #параллелизм #очередь

Вопрос:

Прежде чем я изобрету велосипед, существует ли параллельная очередь, похожая на тему, в обычной Java? У меня есть следующие требования:

  • Несколько читателей / потребителей
  • Несколько авторов / продюсеров
  • Каждое сообщение должно быть использовано каждым (активным) потребителем
  • После того, как каждый потребитель прочитает сообщение, оно должно стать мусором (т. Е. Больше никаких ссылок)
  • Запись в очередь не должна составлять O (N) по отношению к числу потребителей
  • Параллельный, предпочтительно неблокирующий
  • Не на основе JMS: это для гораздо более легкой / встраиваемой среды

Это практически все, что мне нужно. Есть какие-либо указания?

Комментарии:

1. Я не знаю, что вы имеете в виду под «простой Java», но EventBus ( eventbus.org ) может быть, стоит что-то проверить.

2. Спасибо, я проверил EventBus. Это хороший проект, но не совсем соответствующий моим потребностям.

Ответ №1:

По сути, вы говорите о мультиплексировании, и нет, в стандартной библиотеке чего-то нет, но создать ее довольно просто. Предполагая, что ваши клиенты не заинтересованы в сообщениях, опубликованных до их подписки, тогда вам нужен пул очередей для каждого потребителя, и публикация просто предлагает элемент каждой очереди:

 public class Multiplexer<M> {
  private final List<BlockingQueue<M>> consumers 
    = new CopyOnWriteArrayList<BlockingQueue<M>>();

  public void publish(M msg) {
    for (BlockingQueue<M> q : consumers) {
      q.offer(msg);
    }
  }

  public void addConsumer(BlockingQueue<M> consumer) {
    consumers.add(consumer);
  }
}
  

Эта версия позволяет пользователям использовать любую реализацию блокирующей очереди, которую они могут захотеть. Очевидно, что вы могли бы предоставить стандартную реализацию и приятный интерфейс для клиента, если хотите.

Комментарии:

1. Это работает, предполагая, что вы согласны с тем, что потребители фактически блокируют offer вызов. Это тот же совет для любого шаблона «слушатель» — слушатели должны выполнять свои действия как можно быстрее, что может означать, что другой поток должен выполнить работу. Если это обычный сценарий, возможно, мультиплексору следует создать поток для каждого потребителя (или какой-то пул потоков) для управления отправкой сообщений.

2. Спасибо, но это решение увеличивает время линейной публикации с потребителями. Я отредактировал сообщение, чтобы прояснить это.

3. @AngerClown любая реализация BlockingQueue, которая блокирует offer(e) , фундаментально нарушена. Что касается действий слушателя и потоков, потребители опрашивают / извлекают из очереди, когда захотят.

4. @daniel-yokomizo зачем вам предлагать / публиковать постоянное время? сколько у вас потребителей?

5. Я пишу систему многоадресной рассылки, поэтому потенциально тысячи.

Ответ №2:

третьего условия нет в обычной java, но вы можете использовать неблокирующую связанную очередь с отдельным заголовком для каждого потребителя (вы можете полагаться на GC для сбора узлов без ссылок)

Комментарии:

1. Очень интересная статья. Я реализую это для сравнения с моим решением.

Ответ №3:

Самая простая стратегия — передать сообщение каждому потребителю, у меня не было бы так много потребителей, что количество потребителей важно. Вы можете добавлять сообщения десяткам пользователей за несколько микросекунд.

Один из способов избежать этого — иметь кольцевой буфер с большим количеством считывателей. Это сложно реализовать и означает, что потребители будут ограничены в количестве источников сообщений, которые они могут иметь.

Ответ №4:

Имейте только одного псевдопотребителя и позвольте реальным потребителям регистрироваться у псевдопотребителя. Когда производитель отправляет сообщение, псевдопотребитель просыпается и использует сообщение. При получении сообщения псевдопотребитель создает отдельное Runnable для каждого реального потребителя, зарегистрированного в нем, и выполняет их в пуле потоков.