Время ожидания получения соединения при потоковой передаче результатов с помощью Express

#postgresql #express #stream #knex

Вопрос:

Мы используем следующий код для передачи результатов запроса обратно клиенту:

 app.get('/events', (req, res) => {
  try {
    const stream = db('events')
      .select('*')
      .where({ id_user: 'foo' })
      .stream()

    stream.pipe(JSONStream.stringify()).pipe(res)
  } catch (err) {
    next(err)
  }
})
 

В то время как код, по-видимому, имеет отличный профиль использования памяти (стабильное/низкое использование памяти), он создает случайные тайм-ауты для подключения к БД:

Knex: Время ожидания получения соединения. Бассейн, наверное, полон. Вы пропускаете вызов. transacting(trx)?

Это происходит в производстве с кажущимися случайными интервалами. Есть идеи, почему?

Ответ №1:

Это происходит из-за того, что прерванные запросы (т. е. клиент закрывает браузер в середине запроса) не освобождают соединение обратно в пул.

Во-первых, убедитесь, что вы используете последнюю версию knex или, по крайней мере, версию v0.21.3 , которая содержит исправления для обработки потока/пула.

С этого момента у вас есть пара вариантов:

Либо используйте pipeline вместо pipe того, чтобы правильно обрабатывать прерванные запросы, вот так:

 const { pipeline } = require('stream')

app.get('/events', (req, res) => {
  try {
    const stream = db('events')
      .select('*')
      .where({ id_session: req.query.id_session })
      .stream()

    return pipeline(stream, JSONStream.stringify(), res, err => {
      if (err) {
        return console.log(`Pipeline failed with err:`, err)
      }

      console.log(`Pipeline ended succesfully`)
    })
  } catch (err) {
    next(err)
  }
})
 

или прослушайте close событие req и уничтожьте поток БД самостоятельно, вот так:

 app.get('/events', (req, res) => {
  try {
    const stream = db('events')
      .select('*')
      .where({ id_session: req.query.id_session })
      .stream()

    // Not listening to this event will crash the process if
    // stream.destroy(err) is called.
    stream.on('error', () => {
      console.log('Stream was destroyed')
    })

    req.on('close', () => {
      // stream.end() does not seem to work, only destroy()
      stream.destroy('Aborted request')
    })

    stream.pipe(JSONStream.stringify()).pipe(res)
  } catch (err) {
    next(err)
  }
})
 

Полезное чтение: