Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to create a readable stream with an async data source in NodeJs?

Tags:

Environment: NodeJS, Express, DynamoDB (but could be any database really)

Scenario: Need to read a large number of records and return to the user as a downloadable file. This means that I cannot buffer all the content at once and then send it in a response from Express. Also, I may need to execute the query multiple times since all data might not be returned in one query.

Proposed Solution: Use a readable stream that can be piped to the response stream in Express.

I started by creating an object that inherits from stream.Readable and implemented a _read() method which pushes the query results. The problem is that the database query invoked in _read() is async but the stream.read() is a sync method.

When the stream is piped to the response of the server, the read is invoked several times before the db query even got a chance to execute. So the query is invoked multiple times and even when the first instance of the query finishes and does a push(null), the other queries complete and I get a "push() after EOF" error.

  1. Is there a way to do this properly with _read()?
  2. Should I forget about _read() and just execute the query and push() results in the constructor?
  3. Should I execute the query and emit data events instead of push()?

Thank you

function DynamoDbResultStream(query, options){     if(!(this instanceof DynamoDbResultStream)){         return new DynamoDbResultStream(query, options);     }      Readable.call(this, options);      this.dbQuery = query;     this.done = false; } util.inherits(DynamoDbResultStream, Readable);  DynamoDbResultStream.prototype._read = function(){     var self = this;     if(!this.done){         dynamoDB.query(this.dbQuery, function(err, data) {             if (!err) {                 try{                     for(i=0;i<data.Items.length;i++){                         self.push(data.Items[i]);                     }                 }catch(err){                     console.log(err);                 }                 if (data.LastEvaluatedKey) {                     //Next read() should invoke the query with a new start key                     self.dbQuery.ExclusiveStartKey = data.LastEvaluatedKey;                 }else{                     self.done=true;                     self.push(null);                 }             }else{                  console.log(err);                  self.emit('error',err);             }         });     }else{         self.push(null);     } }; 

EDIT: After posting this question, I've found this post with an answer that shows how to do it without using inheritance: How to call an asynchronous function inside a node.js readable stream

A comment was made there that inside _read() there should only be one push(). And each push() will usually generate another read() invocation.

like image 392
swbandit Avatar asked Mar 10 '16 15:03

swbandit


People also ask

How do I create a readable stream in Node?

To implement a readable stream, we require the Readable interface, and construct an object from it, and implement a read() method in the stream's configuration parameter: const { Readable } = require('stream'); const inStream = new Readable({ read() {} }); There is a simple way to implement readable streams.

Are NodeJS streams async?

Introduction. A stream is an abstraction of data in programming. The Node. js Stream API has been around for a long time and is used as a uniform API for reading and writing asynchronous data.

Is createReadStream asynchronous?

createReadStream() is an asynchronous operation that has completion events. But, because of the way it's been combined with reading from the file, you don't generally have to use it like it's asynchronous because it's asynchronous behavior is combined with the async reading from the file.


1 Answers

Be aware of the different modes of Stream: https://nodejs.org/api/stream.html#stream_two_modes

const Readable = require('stream').Readable;  // starts in paused mode const readable = new Readable();  let i = 0; fetchMyAsyncData() {   setTimeout(() => {     // still remains in paused mode     readable.push(++i);      if (i === 5) {       return readable.emit('end');     }      fetchMyAsyncData();   }, 500);     }  // "The res object is an enhanced version of Node’s own response object and supports all built-in fields and methods." app.get('/mystreamingresponse', (req, res) => {    // remains in paused mode   readable.on('readable', () => res.write(readable.read()));    fetchMyAsyncData();    // closes the response stream once all external data arrived   readable.on('end', () => res.end()); }) 
like image 141
dsdenes Avatar answered Sep 28 '22 02:09

dsdenes