#node.js #express #asynchronous #stream #gzip
#node.js #выразить #асинхронный #поток #gzip
Вопрос:
Я создаю сервер NodeJS с ExpressJS, который обрабатывает данные (от 50 КБ до > 100 МБ), отправленные через POST-запрос из настольного приложения для обработки и возврата. Настольное приложение gzip сжимает данные перед отправкой (50 КБ становится 4 КБ).
Я хочу, чтобы сервер распаковал данные, извлек значения из данных (строки, целые числа, символы, массивы, json и т.д.), обработал эти данные, а затем ответил обработанными данными.
Я начал с этого:
apiRoute.route("/convert").post(bodyParser.raw({limit: '100Mb'}), (req, res) =>{
let outputData;
//extract values from req.body Buffer and do math on them.
//save processed data in outputData
res.json({
status: true,
data: outputData
});
});
Это работает, потому что body-parser распаковывает данные в буфер req.body
, хранящийся в памяти. Это моя главная проблема … использование памяти. Я не хочу хранить весь набор данных в памяти.
Чтобы решить эту проблему, я удалил body-parser и вместо этого передал поток запросов непосредственно в поток преобразования zlib:
apiRoute.route("/convert").post((req, res) =>{
req.pipe(zlib.createGunzip());
});
Проблема сейчас в том, что я не знаю, как извлекать двоичные значения из потока.
Это то, что я хотел бы иметь возможность делать:
apiRoute.route("/convert").post((req, res) =>{
let binaryStream = new stream.Transform();
req
.pipe(zlib.createGunzip())
.pipe(binaryStream);
let aValue = binaryStream.getBytes(20);//returns 20 bytes
let bValue = binaryStream.getBytes(20000);//returns the next 20000 bytes
//etc...
});
Однако я не знаю ни одного способа добиться этого. Такие модули, как Dissolve, близки, однако они требуют, чтобы вы заранее настроили логику синтаксического анализа, и все полученные значения сохраняются в памяти.
Кроме того, я не знаю, как ответить с помощью OutputData, не загружая все это в память.
Итак, мой вопрос в том, как мне..
- Считывать данные из потока с моей собственной скоростью асинхронно и извлекать значения внутри
- Отправляйте обработанные данные обратно в настольное приложение, не помещая все это в память
Ответ №1:
Я решил свою собственную проблему. Я не уверен на 100%, что это лучший способ добиться этого, поэтому я открыт для предложений.
Я создал подкласс stream.Transform
и реализовал _transform
метод. Я обнаружил, что следующий блок данных получает входные данные только при _transform
обратном вызове. Зная это, я сохранил эту функцию обратного вызова как свойство и вызываю ее только тогда, когда мне нужен следующий фрагмент.
getBytes(size)
это метод, который получит указанное количество байтов из текущего фрагмента (также сохраненного как свойство) и вызовет ранее сохраненный обратный вызов, если потребуется следующий фрагмент. Это делается рекурсивно для учета различных размеров блоков и различного количества запрашиваемых байтов.
Затем, используя сочетание async / await и promises, я смог сохранить весь этот процесс асинхронным (afaik) и без обратного давления.
const {Transform} = require('stream'),
events = require('events');
class ByteStream extends Transform{
constructor(options){
super(options);
this.event_emitter = new events.EventEmitter();
this.hasStarted = false;
this.hasEnded = false;
this.currentChunk;
this.nextCallback;
this.pos = 0;
this.on('finish', ()=>{
this.hasEnded = true;
this.event_emitter.emit('chunkGrabbed');
});
}
_transform(chunk, enc, callback){
this.pos = 0;
this.currentChunk = chunk;
this.nextCallback = callback;
if(!this.hasStarted){
this.hasStarted = true;
this.event_emitter.emit('started');
}
else{
this.event_emitter.emit('chunkGrabbed');
}
}
doNextCallback(){
return new Promise((resolve, reject) =>{
this.event_emitter.once('chunkGrabbed', ()=>{resolve();});
this.nextCallback();
});
}
async getBytes(size){
if(this.pos size > this.currentChunk.length)
{
let bytes = this.currentChunk.slice(this.pos, this.currentChunk.length);
if(!this.hasEnded)
{
var newSize = size-(this.currentChunk.length - this.pos);
//grab next chunk
await this.doNextCallback();
if(!this.hasEnded){
this.pos = 0;
let recurseBytes; await this.getBytes(newSize).then(bytes => {recurseBytes = bytes;});
bytes = Buffer.concat([bytes, recurseBytes]);
}
}
return bytes;
}
else{
let bytes = this.currentChunk.slice(this.pos, this.pos size);
this.pos = size;
return bytes;
}
}
}
module.exports = {
ByteStream : ByteStream
}
Мой экспресс-маршрут теперь:
apiRoute.route("/convert").post((req, res)=>{
let bStream = new ByteStream({});
let gStream = zlib.createGunzip();
bStream event_emitter.on('started', async () => {
console.log("started!");
let myValue; await bStream.getBytes(60000).then(bytes => {myValue = bytes});
console.log(myValue.length);
});
req
.pipe(gStream)
.pipe(bStream);
});
Проверяя событие started
, я могу узнать, когда был передан первый фрагмент bStream
. Оттуда это просто вопрос вызова getBytes()
с моим желаемым количеством байтов, а затем присвоения обещанного значения переменной. Это делает именно то, что мне нужно, хотя я еще не проводил тщательного тестирования.