как передавать потоковые записи с помощью pg-запроса-потока с асинхронным/ожиданием?

#node.js #postgresql #node-postgres

Вопрос:

Используя pg-query-stream/node-postgres, я пытаюсь реализовать служебный метод, который можно использовать для последовательной (и асинхронной) обработки каждой записи из запроса. Во время обработки каждой записи обработчик может вернуться false , чтобы выйти из итерации и немедленно вернуться. Этот служебный метод сам решает true , прошел ли он все записи, и обработчик разрешен true для каждой из них, и он false решает, разрешался ли обработчик когда-либо false . Ниже приведено то, что я реализовал, и это (в основном), похоже, работает идеально. Однако, когда под большой нагрузкой, в конечном итоге мой пул postgres переходит в совершенно не реагирующее состояние-я предполагаю, что плохие соединения возвращаются в пул, и следующая команда, выданная для них, просто зависает на неопределенный срок? или что-то в этом роде…Конечно, я не могу воспроизвести это достоверно. Документы в потоке pg-запросов довольно скудны, поэтому я не совсем уверен, что использую его правильно. Когда я заканчиваю поток рано, правильно ли просто позвонить stream.destroy() и вернуть клиента в пул? Должен ли я вообще звонить stream.destroy() об успешном завершении и/или об ошибке? Ожидается ли, что событие «конец» не соответствует статусу «приостановлено» потока? Является ли «конец» даже правильным событием для этой цели или «закрыть» более правильным? В чем разница между «конец» и «конец»? Мы высоко ценим любую информацию о правильном использовании, спасибо!

   public async forEachRecord(
    queryText: string,
    values: any[],
    recordHandler: (record: TRecord) => boolean | Promise<boolean>,
  ): Promise<boolean> {
  
    const query = new QueryStream(queryText, values);
    const client = await this.pool.connect();
    let error: any;
    return new Promise<boolean>((resolve, reject) => {
      let hasEnded = false;

      query.on('data', async (data) => {
        query.pause();
        try {
          if (await recordHandler(data)) {
            query.resume();
            return;
          }
          resolve(false);
        } catch (err) {
          reject(err);
        } finally {
          if (hasEnded) {
            resolve(true);
          }
        }
      });

      query.on('end', async () => {
        hasEnded = true;
        // 'end' can come while the last recordHandler() is still executing asynchronously, while the stream
        // is paused...so we need to do this check so the last recordHandler()'s return value is honored
        if (!query.isPaused()) {
          resolve(true);
        }
      });

      query.on('error', (err) => {
        error = err;
        reject(err);
      });

      client.query(query);

    }).finally(() => {
      query.destroy();
      client.release(error);
    });
  }
 

Комментарии:

1. Можно ли его использовать pool.query ? Он сам справится с очисткой.

2. Поскольку потоки узлов реализуют интерфейс асинхронного итератора, я бы рекомендовал использовать for await (const data of query) { if (!await recordHandler(data)) return false; } return true;

3. Благодаря Bergi 🙏, я переключил свою реализацию на этот подход, и, похоже, он отлично работает без проблем при нагрузке. Мне все еще очень любопытно, что было не так с моей первоначальной реализацией,, но если вы хотите представить это в качестве ответа, я приму его 👍

4. Этого я тоже не знаю, но приятно знать, что простой код по умолчанию все делает правильно 🙂