Environment: NodeJS, Express, DynamoDB (but could be any database really)
Scenario: Need to read a large number of records and return to the user as a downloadable file. This means that I cannot buffer all the content at once and then send it in a response from Express. Also, I may need to execute the query multiple times since all data might not be returned in one query.
Proposed Solution: Use a readable stream that can be piped to the response stream in Express.
I started by creating an object that inherits from stream.Readable and implemented a _read() method which pushes the query results. The problem is that the database query invoked in _read() is async but the stream.read() is a sync method.
When the stream is piped to the response of the server, the read is invoked several times before the db query even got a chance to execute. So the query is invoked multiple times and even when the first instance of the query finishes and does a push(null), the other queries complete and I get a "push() after EOF" error.
Thank you
function DynamoDbResultStream(query, options){ if(!(this instanceof DynamoDbResultStream)){ return new DynamoDbResultStream(query, options); } Readable.call(this, options); this.dbQuery = query; this.done = false; } util.inherits(DynamoDbResultStream, Readable); DynamoDbResultStream.prototype._read = function(){ var self = this; if(!this.done){ dynamoDB.query(this.dbQuery, function(err, data) { if (!err) { try{ for(i=0;i<data.Items.length;i++){ self.push(data.Items[i]); } }catch(err){ console.log(err); } if (data.LastEvaluatedKey) { //Next read() should invoke the query with a new start key self.dbQuery.ExclusiveStartKey = data.LastEvaluatedKey; }else{ self.done=true; self.push(null); } }else{ console.log(err); self.emit('error',err); } }); }else{ self.push(null); } };
EDIT: After posting this question, I've found this post with an answer that shows how to do it without using inheritance: How to call an asynchronous function inside a node.js readable stream
A comment was made there that inside _read() there should only be one push(). And each push() will usually generate another read() invocation.
To implement a readable stream, we require the Readable interface, and construct an object from it, and implement a read() method in the stream's configuration parameter: const { Readable } = require('stream'); const inStream = new Readable({ read() {} }); There is a simple way to implement readable streams.
Introduction. A stream is an abstraction of data in programming. The Node. js Stream API has been around for a long time and is used as a uniform API for reading and writing asynchronous data.
createReadStream() is an asynchronous operation that has completion events. But, because of the way it's been combined with reading from the file, you don't generally have to use it like it's asynchronous because it's asynchronous behavior is combined with the async reading from the file.
Be aware of the different modes of Stream: https://nodejs.org/api/stream.html#stream_two_modes
const Readable = require('stream').Readable; // starts in paused mode const readable = new Readable(); let i = 0; fetchMyAsyncData() { setTimeout(() => { // still remains in paused mode readable.push(++i); if (i === 5) { return readable.emit('end'); } fetchMyAsyncData(); }, 500); } // "The res object is an enhanced version of Node’s own response object and supports all built-in fields and methods." app.get('/mystreamingresponse', (req, res) => { // remains in paused mode readable.on('readable', () => res.write(readable.read())); fetchMyAsyncData(); // closes the response stream once all external data arrived readable.on('end', () => res.end()); })
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With