I'm building a NodeJS server with ExpressJS that processes data (50KB to >100MB) sent via POST request from a desktop app to be processed and returned. The desktop app gzip compresses the data prior to sending (50KB becomes 4KB).
I want the server to decompress the data, extract the values from the data (strings, integers, chars, arrays, json, etc), process that data, and then respond with the processed data.
I started with this:
apiRoute.route("/convert").post(bodyParser.raw({limit: '100Mb'}), (req, res) =>{
let outputData;
//extract values from req.body Buffer and do math on them.
//save processed data in outputData
res.json({
status: true,
data: outputData
});
});
This works because body-parser decompresses the data into a Buffer req.body
stored in memory. That is my main issue... memory usage. I do not want to store the entire dataset in memory.
To resolve this I removed body-parser and instead piped the request stream directly into a zlib transform stream:
apiRoute.route("/convert").post((req, res) =>{
req.pipe(zlib.createGunzip());
});
The issue now is that I don't know how to extract binary values from the stream.
This is what I would LIKE to be able to do:
apiRoute.route("/convert").post((req, res) =>{
let binaryStream = new stream.Transform();
req
.pipe(zlib.createGunzip())
.pipe(binaryStream);
let aValue = binaryStream.getBytes(20);//returns 20 bytes
let bValue = binaryStream.getBytes(20000);//returns the next 20000 bytes
//etc...
});
However I don't know of any way to accomplish this. Modules like Dissolve are close, however they require you to set up the parsing logic ahead of time and all the grabbed values are stored in memory.
Plus I don't know how to respond with the outputData without also loading it all into memory.
So my question is, how do I...
I solved my own problem. I'm not 100% confident this is the best way to accomplish this, so I'm open to suggestions.
I made a subclass of stream.Transform
and implemented the _transform
method. I discovered that the next data chunk only gets input when the _transform
callback is called. Knowing this, I stored that callback function as a property and only call it when I need the next chunk.
getBytes(size)
is a method that will get a specified number of bytes from the current chunk (saved as a property as well) and call the earlier saved callback if the next chunk is needed. This is done recursively to account for varying sizes of chunks and varying number of requested bytes.
Then with a mix of async/await and promises, I was able to keep this entire process asynchronous (afaik) and backpressured.
const {Transform} = require('stream'),
events = require('events');
class ByteStream extends Transform{
constructor(options){
super(options);
this.event_emitter = new events.EventEmitter();
this.hasStarted = false;
this.hasEnded = false;
this.currentChunk;
this.nextCallback;
this.pos = 0;
this.on('finish', ()=>{
this.hasEnded = true;
this.event_emitter.emit('chunkGrabbed');
});
}
_transform(chunk, enc, callback){
this.pos = 0;
this.currentChunk = chunk;
this.nextCallback = callback;
if(!this.hasStarted){
this.hasStarted = true;
this.event_emitter.emit('started');
}
else{
this.event_emitter.emit('chunkGrabbed');
}
}
doNextCallback(){
return new Promise((resolve, reject) =>{
this.event_emitter.once('chunkGrabbed', ()=>{resolve();});
this.nextCallback();
});
}
async getBytes(size){
if(this.pos + size > this.currentChunk.length)
{
let bytes = this.currentChunk.slice(this.pos, this.currentChunk.length);
if(!this.hasEnded)
{
var newSize = size-(this.currentChunk.length - this.pos);
//grab next chunk
await this.doNextCallback();
if(!this.hasEnded){
this.pos = 0;
let recurseBytes; await this.getBytes(newSize).then(bytes => {recurseBytes = bytes;});
bytes = Buffer.concat([bytes, recurseBytes]);
}
}
return bytes;
}
else{
let bytes = this.currentChunk.slice(this.pos, this.pos+size);
this.pos += size;
return bytes;
}
}
}
module.exports = {
ByteStream : ByteStream
}
My express route is now:
apiRoute.route("/convert").post((req, res)=>{
let bStream = new ByteStream({});
let gStream = zlib.createGunzip();
bStream event_emitter.on('started', async () => {
console.log("started!");
let myValue; await bStream.getBytes(60000).then(bytes => {myValue = bytes});
console.log(myValue.length);
});
req
.pipe(gStream)
.pipe(bStream);
});
By checking for an event started
I can know when the first chunk was streamed into bStream
. From there, it's just a matter of calling getBytes()
with my desired byte count and then assigning the promised value to a variable. It does just what I need, although I haven't don't any rigorous testing yet.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With