Очередь RabbitMQ не используется параллельно несколькими потребительскими службами

#node.js #rabbitmq #node-amqplib

#node.js #RabbitMQ #узел-amqplib

Вопрос:

Я работаю над NodeJS и RabbitMQ, где —

  • Основная служба NodeJS (один экземпляр) Помещает данные в очередь.
  • Подчиненные службы NodeJS (несколько экземпляров) потребляют данные и обрабатывают их.
  • Используется обмен по умолчанию.

Подчиненная служба работает в режиме кластера PM2, что означает, что у меня есть 8 экземпляров для запуска подчиненной службы.

Я ожидал, что, когда главная служба начнет передавать данные через очередь, подчиненные службы должны начать использовать их асинхронно.

Например, если главная служба отправляет 10 заданий через очередь, и если выполнение каждого задания занимает 5 секунд, для завершения задания подчиненным требуется 50 секунд.

Это полностью противоречит цели использования нескольких подчиненных устройств, поскольку я бы хотел, чтобы подчиненные устройства выполняли 8 заданий одновременно.

В соответствии с панелью мониторинга RabbitMQ, приведенная выше настройка создает —

  • 9 подключений (1 ведущий 8 подчиненных)
  • 9 каналов (1 ведущий 8 подчиненных)
  • 1 очередь

Вся настройка использует exchange по умолчанию.

Мой вопрос —

Почему подчиненные устройства не могут считывать данные из очереди асинхронно?

Даже если я настроен noAck на true следующий элемент в очереди, он не будет выбран до тех пор, пока не будет обработан текущий элемент

Я намерен увеличить скорость использования очереди несколькими подчиненными экземплярами пользователя, но я думаю, что здесь я что-то упускаю.

Вот кодовая база —

 const rabbitMq = require("amqplib");

class RabbitMQClient {

    async connect() {
        this.connection = await rabbitMq.connect("amqp://admin:password@localhost");
        this.channel = await this.connection.createChannel();
        await this.channel.assertQueue("TEST_QUEUE");
    }
}


// MASTER CODE (This Runs In Fork Mode - 1 Instance)
const master_client = new RabbitMQClient();
master_client.connect().then(() => {
    // sending in 10 messages
    for (let index = 1; index <= 10; index  ) {
        const data = Buffer.from(index.toString(), "utf8");
        master_client.channel.sendToQueue("TEST_QUEUE", data);
    }
});


// SLAVE CODE (This Runs In Cluster Mode - 8 Instances)
const slave_client = new RabbitMQClient();
// connect to rabbitmq
slave_client.connect().then(() => {
    // consume the messages
    slave_client.channel.consume("TEST_QUEUE", (data) => {
        // timeout to add delay
        setTimeout(() => {
            RabbitMQClient._channel.ack(data);
        }, 5000);
    });
});
  

Подчиненный вывод —

 33|slave | 2020-11-02 13:19:09.293  00:00: recieved message - 1  (13-19-09)
34|slave | 2020-11-02 13:19:14.293  00:00: recieved message - 2  (13-19-14)
35|slave | 2020-11-02 13:19:19.299  00:00: recieved message - 3  (13-19-19)
36|slave | 2020-11-02 13:19:24.299  00:00: recieved message - 4  (13-19-24)
37|slave | 2020-11-02 13:19:29.300  00:00: recieved message - 5  (13-19-29)
38|slave | 2020-11-02 13:19:34.299  00:00: recieved message - 6  (13-19-34)
39|slave | 2020-11-02 13:19:39.301  00:00: recieved message - 7  (13-19-39)
40|slave | 2020-11-02 13:19:44.301  00:00: recieved message - 8  (13-19-44)
33|slave | 2020-11-02 13:19:49.299  00:00: recieved message - 9  (13-19-49)
34|slave | 2020-11-02 13:19:54.300  00:00: recieved message - 10 (13-19-54)
  

Если вы заметили, разные подчиненные устройства собирают сообщения циклическим способом, но они работают синхронно.

Спасибо!

Ответ №1:

Очереди в RabbitMQ являются однопоточными, которые не могут отправлять сообщения параллельно одновременно. Но это не означает, что потребители не обрабатывают одновременно. Когда потребляет один потребитель, другие потребители, привязанные к той же очереди, могут одновременно получить сообщение и начать потребление.

Как увеличить пропускную способность

В общем, всегда полезны следующие советы:

  • Разделите свои очереди на разные ядра. Вы можете использовать плагин сегментирования RabbitMQ, который может автоматически сегментировать очередь, или вы можете сделать это вручную
  • Установите оптимальное количество предварительной выборки для каждого потребителя, что может снизить сетевые затраты потребителей на получение сообщений
  • 1 канал на потребителя. То же, что и очередь, канал также является однопоточным

Для получения более подробной информации вы можете посетить здесь