#node.js #postgresql #node-postgres
Вопрос:
Используя pg-query-stream/node-postgres, я пытаюсь реализовать служебный метод, который можно использовать для последовательной (и асинхронной) обработки каждой записи из запроса. Во время обработки каждой записи обработчик может вернуться false
, чтобы выйти из итерации и немедленно вернуться. Этот служебный метод сам решает true
, прошел ли он все записи, и обработчик разрешен true
для каждой из них, и он false
решает, разрешался ли обработчик когда-либо false
. Ниже приведено то, что я реализовал, и это (в основном), похоже, работает идеально. Однако, когда под большой нагрузкой, в конечном итоге мой пул postgres переходит в совершенно не реагирующее состояние-я предполагаю, что плохие соединения возвращаются в пул, и следующая команда, выданная для них, просто зависает на неопределенный срок? или что-то в этом роде…Конечно, я не могу воспроизвести это достоверно. Документы в потоке pg-запросов довольно скудны, поэтому я не совсем уверен, что использую его правильно. Когда я заканчиваю поток рано, правильно ли просто позвонить stream.destroy()
и вернуть клиента в пул? Должен ли я вообще звонить stream.destroy()
об успешном завершении и/или об ошибке? Ожидается ли, что событие «конец» не соответствует статусу «приостановлено» потока? Является ли «конец» даже правильным событием для этой цели или «закрыть» более правильным? В чем разница между «конец» и «конец»? Мы высоко ценим любую информацию о правильном использовании, спасибо!
public async forEachRecord(
queryText: string,
values: any[],
recordHandler: (record: TRecord) => boolean | Promise<boolean>,
): Promise<boolean> {
const query = new QueryStream(queryText, values);
const client = await this.pool.connect();
let error: any;
return new Promise<boolean>((resolve, reject) => {
let hasEnded = false;
query.on('data', async (data) => {
query.pause();
try {
if (await recordHandler(data)) {
query.resume();
return;
}
resolve(false);
} catch (err) {
reject(err);
} finally {
if (hasEnded) {
resolve(true);
}
}
});
query.on('end', async () => {
hasEnded = true;
// 'end' can come while the last recordHandler() is still executing asynchronously, while the stream
// is paused...so we need to do this check so the last recordHandler()'s return value is honored
if (!query.isPaused()) {
resolve(true);
}
});
query.on('error', (err) => {
error = err;
reject(err);
});
client.query(query);
}).finally(() => {
query.destroy();
client.release(error);
});
}
Комментарии:
1. Можно ли его использовать
pool.query
? Он сам справится с очисткой.2. Поскольку потоки узлов реализуют интерфейс асинхронного итератора, я бы рекомендовал использовать
for await (const data of query) { if (!await recordHandler(data)) return false; } return true;
3. Благодаря Bergi 🙏, я переключил свою реализацию на этот подход, и, похоже, он отлично работает без проблем при нагрузке. Мне все еще очень любопытно, что было не так с моей первоначальной реализацией,, но если вы хотите представить это в качестве ответа, я приму его 👍
4. Этого я тоже не знаю, но приятно знать, что простой код по умолчанию все делает правильно 🙂