#node.js #rabbitmq #node-amqplib
#node.js #RabbitMQ #узел-amqplib
Вопрос:
Я работаю над NodeJS и RabbitMQ, где —
- Основная служба NodeJS (один экземпляр) Помещает данные в очередь.
- Подчиненные службы NodeJS (несколько экземпляров) потребляют данные и обрабатывают их.
- Используется обмен по умолчанию.
Подчиненная служба работает в режиме кластера PM2, что означает, что у меня есть 8 экземпляров для запуска подчиненной службы.
Я ожидал, что, когда главная служба начнет передавать данные через очередь, подчиненные службы должны начать использовать их асинхронно.
Например, если главная служба отправляет 10 заданий через очередь, и если выполнение каждого задания занимает 5 секунд, для завершения задания подчиненным требуется 50 секунд.
Это полностью противоречит цели использования нескольких подчиненных устройств, поскольку я бы хотел, чтобы подчиненные устройства выполняли 8 заданий одновременно.
В соответствии с панелью мониторинга RabbitMQ, приведенная выше настройка создает —
- 9 подключений (1 ведущий 8 подчиненных)
- 9 каналов (1 ведущий 8 подчиненных)
- 1 очередь
Вся настройка использует exchange по умолчанию.
Мой вопрос —
Почему подчиненные устройства не могут считывать данные из очереди асинхронно?
Даже если я настроен noAck
на true
следующий элемент в очереди, он не будет выбран до тех пор, пока не будет обработан текущий элемент
Я намерен увеличить скорость использования очереди несколькими подчиненными экземплярами пользователя, но я думаю, что здесь я что-то упускаю.
Вот кодовая база —
const rabbitMq = require("amqplib");
class RabbitMQClient {
async connect() {
this.connection = await rabbitMq.connect("amqp://admin:password@localhost");
this.channel = await this.connection.createChannel();
await this.channel.assertQueue("TEST_QUEUE");
}
}
// MASTER CODE (This Runs In Fork Mode - 1 Instance)
const master_client = new RabbitMQClient();
master_client.connect().then(() => {
// sending in 10 messages
for (let index = 1; index <= 10; index ) {
const data = Buffer.from(index.toString(), "utf8");
master_client.channel.sendToQueue("TEST_QUEUE", data);
}
});
// SLAVE CODE (This Runs In Cluster Mode - 8 Instances)
const slave_client = new RabbitMQClient();
// connect to rabbitmq
slave_client.connect().then(() => {
// consume the messages
slave_client.channel.consume("TEST_QUEUE", (data) => {
// timeout to add delay
setTimeout(() => {
RabbitMQClient._channel.ack(data);
}, 5000);
});
});
Подчиненный вывод —
33|slave | 2020-11-02 13:19:09.293 00:00: recieved message - 1 (13-19-09)
34|slave | 2020-11-02 13:19:14.293 00:00: recieved message - 2 (13-19-14)
35|slave | 2020-11-02 13:19:19.299 00:00: recieved message - 3 (13-19-19)
36|slave | 2020-11-02 13:19:24.299 00:00: recieved message - 4 (13-19-24)
37|slave | 2020-11-02 13:19:29.300 00:00: recieved message - 5 (13-19-29)
38|slave | 2020-11-02 13:19:34.299 00:00: recieved message - 6 (13-19-34)
39|slave | 2020-11-02 13:19:39.301 00:00: recieved message - 7 (13-19-39)
40|slave | 2020-11-02 13:19:44.301 00:00: recieved message - 8 (13-19-44)
33|slave | 2020-11-02 13:19:49.299 00:00: recieved message - 9 (13-19-49)
34|slave | 2020-11-02 13:19:54.300 00:00: recieved message - 10 (13-19-54)
Если вы заметили, разные подчиненные устройства собирают сообщения циклическим способом, но они работают синхронно.
Спасибо!
Ответ №1:
Очереди в RabbitMQ являются однопоточными, которые не могут отправлять сообщения параллельно одновременно. Но это не означает, что потребители не обрабатывают одновременно. Когда потребляет один потребитель, другие потребители, привязанные к той же очереди, могут одновременно получить сообщение и начать потребление.
Как увеличить пропускную способность
В общем, всегда полезны следующие советы:
- Разделите свои очереди на разные ядра. Вы можете использовать плагин сегментирования RabbitMQ, который может автоматически сегментировать очередь, или вы можете сделать это вручную
- Установите оптимальное количество предварительной выборки для каждого потребителя, что может снизить сетевые затраты потребителей на получение сообщений
- 1 канал на потребителя. То же, что и очередь, канал также является однопоточным
Для получения более подробной информации вы можете посетить здесь