#node.js #express #rabbitmq #bullmq
Вопрос:
Перефразировано в конце
NodeJS взаимодействует с другими API-интерфейсами через GRPC.
Каждый внешний API имеет свое собственное выделенное GRPC-соединение с узлом, и каждое выделенное GRPC-соединение имеет верхнюю границу одновременных клиентов, которых оно может обслуживать одновременно (например, Внешний API 1 имеет верхнюю границу 30 пользователей).
Каждый запрос к Экспресс-API может потребоваться для связи с внешним API 1, Внешним API 2 или внешним API 3 (с этого момента EAP1, EAP2 и т. Д.), И у Экспресс-API также есть верхняя граница одновременных клиентов (например, 100 клиентов), Которые могут передавать EAP.
Итак, как я думаю решить эту проблему:
- Клиент делает новый запрос к Экспресс-API.
- Промежуточное программное обеспечение, менеджер очередей, создает Билет для клиента (думайте об этом как о билете, который утверждает доступ к Системе — в нем есть основные данные Клиента (например, имя))
- Клиент получает Билет, создает Прослушиватель событий, который прослушивает событие с идентификатором билета в качестве имени события (когда Система готова принять Билет, она выдает идентификатор Билета в качестве события) и входит в «Лобби», где Клиент просто ждет, пока его идентификатор билета будет принят/объявлен (событие).
Моя проблема в том, что я не могу придумать, как реализовать способ, которым система будет отслеживать билеты, и как создать очередь на основе одновременных клиентов системы.
Прежде чем клиенту будет предоставлен доступ к Системе, сама Система должна:
- Проверьте, достиг ли экспресс-API верхнего предела для одновременных клиентов -> Если это так, ему следует просто подождать, пока не будет доступна новая позиция по билету
- Если новая позиция доступна, она должна проверить билет и выяснить, к какому API ей необходимо обратиться. Если, например, ему необходимо связаться с EAP1, он должен проверить, сколько текущих клиентов используют соединение GRPC. Это уже реализовано (каждый внешний API находится в классе, который содержит всю необходимую информацию). Если EAP1 достиг своего верхнего предела, то NodeJS следует повторить попытку позже (Но, насколько позже? Должен ли я выдавать системное событие после того, как система завершит другой запрос на EAP1?)
Я знаю об этом, но я не совсем уверен, соответствует ли это моим требованиям. Что мне действительно нужно сделать, так это поставить клиентов в очередь, и:
- Проверьте, достиг ли экспресс-API верхней границы для одновременных пользователей
- Если позиция свободна, выберите() Билет из массива Билетов
- Проверьте, достиг ли EAPx верхнего предела для одновременных пользователей
- Если верно, попробуйте другой билет (если он доступен), который должен взаимодействовать с другим EAP
- Если значение false, предоставьте доступ
Правка: Еще одна идея могла бы состоять в том, чтобы иметь две очереди быков. Один для экспресс-API (где параметр «параллелизм» может быть установлен в качестве верхней границы экспресс-API) и один для EAP. У каждой очереди EAP будет отдельный работник (чтобы установить верхние границы).
перефразированный
Чтобы быть более описательным в этом вопросе, я попытаюсь перефразировать потребности.
Простое представление о системе может быть:
Я использовал предложение Клема (RabbitMQ), но, опять же, я не могу достичь параллелизма с ограничениями (верхними границами).
Так,
- Клиент просит у оператора Билет
TicketHandler
. Для того, чтобы Продавец билетов создал новый Билет, клиент, наряду с другой информацией, предоставляетcallback
:
TicketHandler.getTicket(variousInfo, function () {
next();
})
Обратный вызов будет использоваться системой, чтобы позволить Клиенту подключиться к EAP.
- Продавец билетов получает билет:
i) Добавляет его в очередь
ii) Когда билет доступен (верхняя граница не достигнута), он запрашивает соответствующий обработчик EAP, может ли клиент использовать соединение GRPC. Если да, то просит обработчик EAP заблокировать позицию, а затем вызывает доступный обратный вызов билета (с шага 1). Если нет, TicketHandler проверяет следующий доступный билет, которому необходимо связаться с другим EAP. Это должно продолжаться до тех пор, пока обработчик EAP, который сначала сообщил TicketHandler, что «Позиция недоступна», не отправит сообщение TicketHandler, чтобы сообщить ему, что «Теперь есть X доступных позиций» (или «1 доступная позиция»). Затем TicketHandler должен проверить билет, который ранее не мог получить доступ к EAPx, и снова спросить EAPx, может ли он получить доступ к соединению GRPC.
Комментарии:
1. Эй, я думаю, бык подойдет. Если ваш EAP используется только этой системой, вы можете полагаться на конфигурацию параллелизма очередей для управления размером пула. Но, возможно, будет лучше, если вы свяжете свои различные сущности с посредником сообщений. Ваш протокол выглядит близко к protobuff, вы должны иметь возможность использовать его с RabitMQ или другим mq. С этой аркой вам, вероятно, не понадобится «верхняя граница», просто позвольте ей потребляться по мере ее поступления.
2. Привет, Клем. Спасибо за ваш вклад. Я постараюсь быть немного более описательным. Подумайте о моей реализации как о 3 частях. Первая часть-это когда клиент запрашивает билет (он сообщает Системе, что «я хочу общаться с EAPx»). Вторая часть-это то, где происходит логика. Он спрашивает EAPx в «третьей» части: доступно ли соединение GRPC или в нем полно клиентов? Если он заполнен, то системе нужно будет дождаться чего-то (может быть, события?) от третьей части, которая скажет ей: «Хорошо, вы можете использовать соединение сейчас».
3. Пока это происходит, система должна продолжать отправлять другие запросы, в которых используются другие EAP. Итак, я не совсем уверен, нужен ли мне RabbitMQ. С другой стороны, я никогда раньше не использовал посредника сообщений. Но я все еще думаю, что BullMQ был бы просто в порядке. Кстати, верхние границы (особенно для EAP) крайне необходимы, потому что в противном случае, если многие клиенты будут пользоваться услугами одновременно, они потерпят неудачу.
4. Да, вот почему использование посредника сообщений должно быть полезно для решения вашей проблемы. Потому что вам действительно не нужно больше знать, заполнен ли бассейн. Тема MQ может действовать как своего рода очередь. Вам просто нужно создать столько тем, сколько у вас есть, и заполнить нужную, когда вы получите запрос клиента. EAP будут использовать темы так быстро, как только смогут. Существует множество способов настройки очереди MQ. Одно сообщение должно использоваться только одним EAP и, например, никому не передаваться.
5. Это сделает вашу систему еще более масштабируемой. Вы сможете масштабировать EAP1 до многих экземпляров, чтобы обрабатывать больше входных данных, например.
Ответ №1:
Из вашего описания я понимаю, что следует:
- У вас есть Node.js первый ярус. Каждый Node.js поле должно быть ограничено до 100 клиентами
- У вас есть неопределенный задний уровень, который имеет GRPC-соединения с блоками на переднем уровне (назовем их EAP). Каждый ВП <-> Node.js Связь GRPS ограничена N одновременными подключениями.
То, что я вижу здесь,-это ограничения только на уровне сервера и на уровне подключения, поэтому я не вижу причин иметь какую-либо распределенную систему (например, Bull) для управления очередью (если Node.js поле умирает, никто не может восстановить контекст HTTP — запроса, чтобы предложить ответ на этот конкретный запрос, поэтому, когда Node.js блок умирает, ответы на его запросы не более полезны).
Учитывая это, я бы просто создал локальную очередь (такую же простую, как массив) для управления вашей очередью.
Отказ от ответственности: это следует считать псевдокодом, то, что следует ниже, упрощено и непроверено
Это может быть реализация очереди:
interface SimpleQueueObject<Req, Res> {
req: Req;
then: (Res) => void;
catch: (any) => void;
}
class SimpleQueue<Req = any, Res = any> {
constructor(
protected size: number = 100,
/** async function to be executed when a request is de-queued */
protected execute: (req: Req) => Promise<Res>,
/** an optional function that may ba used to indicate a request is
not yet ready to be de-queued. In such case nex request will be attempted */
protected ready?: (req: Req) => boolean,
) { }
_queue: SimpleQueueObject<Req, Res>[] = [];
_running: number = 0;
private _dispatch() {
// Queues all available
while (this._running < this.size amp;amp; this._queue.length > 0) {
// Accept
let obj;
if (this.ready) {
const ix = this._queue.findIndex(o => this.ready(o.req));
// todo : this may cause queue to stall (for now we throw error)
if (ix === -1) return;
obj = this._queue.splice(ix, 1)[0];
} else {
obj = this._queue.pop();
}
// Execute
this.execute(obj.req)
// Resolves the main request
.then(obj.then)
.catch(obj.catch)
// Attempts to queue something else after an outcome from EAP
.finally(() => {
this._running --;
this._dispatch();
});
obj.running = true;
this._running ;
}
}
/** Queue a request, fail if queue is busy */
queue(req: Req): Promise<Res> {
if (this._running >= this.size) {
throw "Queue is busy";
}
// Queue up
return new Promise<Res>((resolve, reject) => {
this._queue.push({ req, then: resolve, catch: reject });
this._dispatch();
});
}
/** Queue a request (even if busy), but wait a maximum time
* for the request to be de-queued */
queueTimeout(req: Req, maxWait: number): Promise<Res> {
return new Promise<Res>((resolve, reject) => {
const obj: SimpleQueueObject<Req, Res> = { req, then: resolve, catch: reject };
// Expire if not started after maxWait
const _t = setTimeout(() => {
const ix = this._queue.indexOf(obj);
if (ix !== -1) {
this._queue.splice(ix, 1);
reject("Request expired");
}
}, maxWait);
// todo : clear timeout
// Queue up
this._queue.push(obj);
this._dispatch();
})
}
isBusy(): boolean {
return this._running >= this.size;
}
}
А затем ваш Node.js бизнес-логика может делать что-то вроде:
const EAP1: SimpleQueue = /* ... */;
const EAP2: SimpleQueue = /* ... */;
const INGRESS: SimpleQueue = new SimpleQueue<any, any>(
100,
// Forward request to EAP
async req => {
if (req.forEap1) {
// Example 1: this will fail if EAP1 is busy
return EAP1.queue(req);
} else if (req.forEap2) {
// Example 2: this will fail if EAP2 is busy and the request can not
// be queued within 200ms
return EAP2.queueTimeout(req, 200);
}
}
)
app.get('/', function (req, res) {
// Forward request to ingress queue
INGRESS.queue(req)
.then(r => res.status(200).send(r))
.catch(e => res.status(400).send(e));
})
Или это решение позволит вам (по запросу) также принимать запросы на загруженные EAP (в общей сложности не более 100) и отправлять их, когда они будут готовы:
const INGRESS: SimpleQueue = new SimpleQueue<any, any>(
100,
// Forward request to EAP
async req => {
if (req.forEap1) {
return EAP1.queue(req);
} else if (req.forEap2) {
return EAP2.queue(req);
}
},
// Delay queue for busy consumers
req => {
if (req.forEap1) {
return !EAP1.isBusy();
} else if (req.forEap2) {
return !EAP2.isBusy();
} else {
return true;
}
}
)
Пожалуйста, обратите внимание, что:
- в этом примере, Node.js начнет выбрасывать, когда будет получено более 100 одновременных запросов (нет ничего необычного в том, чтобы выбросить 503 при регулировании)
- Будьте осторожны, когда у вас больше ограничений на регулирование (Node.js и GRPC в вашем случае), поскольку первое может привести к секундному голоданию (подумайте о получении 100 запросов для EAP1, а затем 10 для EAP2, Node.js будет заполнен запросами EAP1 и откажется от запросов EAP2 все, что делает EAP2, ничего не делает)