Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to call an asynchronous function inside a node.js readable stream

This is a short example of the implementation of a custom readable stream. The class is called MyStream. The stream gets the file/foldernames out of a directory and pushes the values to the data-event.

To compare I implemented (in this example) two different ways/functions. One is syncronous and the other is asynchronous. The second argument of the constructor lets you decide, which way is used (true for the asynchronous and false for synchronous.

The readcounter counts the number of times the method _read is called. Just to give a feedback.

var Readable = require('stream').Readable;
var util = require('util');
var fs = require('fs');
util.inherits(MyStream, Readable);

function MyStream(dirpath, async, opt) {
  Readable.call(this, opt);
  this.async = async;
  this.dirpath = dirpath;
  this.counter = 0;
  this.readcounter = 0;
}

MyStream.prototype._read = function() {
  this.readcounter++;
  if (this.async === true){
    console.log("Readcounter: " + this.readcounter);
    that = this;
    fs.readdir(this.dirpath,function(err, files){
      that.counter ++;
      console.log("Counter: " + that.counter);
      for (var i = 0; i < files.length; i++){
        that.push(files[i]);
      }
      that.push(null);
    });
  } else {
    console.log("Readcounter: " + this.readcounter);
    files = fs.readdirSync(this.dirpath)
    for (var i = 0; i < files.length; i++){
      this.push(files[i]);
    };
    this.push(null);
  }
};
//Instance for a asynchronous call
mystream = new MyStream('C:\\Users', true);
mystream.on('data', function(chunk){
  console.log(chunk.toString());
});

The synchronous way works like expected, but something interesting is happening, when I call it asynchronously. Everytime the filename is pushed via that.push(files[i]) the _read method is called again. Which causes errors, when the first asynchronous loop is finished and that.push(null) defines the end of the stream.

The enviroment I am using to test this: node 4.1.1, Electron 0.35.2.

I do not understand why _read is called so ofthen and why this is happening. Maybe it is a bug? Or is there somthing I do not see at the moment. Is there a way to build a readable stream by using asynchronous functions? To push the chunks asynchronously would be really cool, because it would be the non blocking stream way. Specially when you have bigger amount of data.

like image 762
apxp Avatar asked Dec 03 '15 13:12

apxp


People also ask

How do you call a function in asynchronous?

Inside an async function, you can use the await keyword before a call to a function that returns a promise. This makes the code wait at that point until the promise is settled, at which point the fulfilled value of the promise is treated as a return value, or the rejected value is thrown.

Are Node streams asynchronous?

Introduction. A stream is an abstraction of data in programming. The Node. js Stream API has been around for a long time and is used as a uniform API for reading and writing asynchronous data.


1 Answers

_read is called whenever the "reader" needs data and it usually happens just after you push data.

I had the same sort of "issues" with implementing _read directly so now, I write a function returning a stream object. It works quite good and data can't be "pulled" from my stream, data is avalaible/pushed when I decide it. With your example, I would do it like this:

var Readable = require('stream').Readable;
var fs = require('fs');

function MyStream(dirpath, async, opt) {
  var rs = new Readable();
  // needed to avoid "Not implemented" exception
  rs._read = function() { 
    // console.log('give me data!'); // << this will print after every console.log(folder);
  };

  var counter = 0;
  var readcounter = 0;

  if (async) {
    console.log("Readcounter: " + readcounter);
    fs.readdir(dirpath, function (err, files) {
      counter++;
      console.log("Counter: " + counter);
      for (var i = 0; i < files.length; i++) {
        rs.push(files[i]);
      }
      rs.push(null);
    });
  } else {
    console.log("Readcounter: " + readcounter);
    files = fs.readdirSync(dirpath)
    for (var i = 0; i < files.length; i++) {
      rs.push(files[i]);
    };
    rs.push(null);
  }

  return rs;
}

var mystream = MyStream('C:\\Users', true);
mystream.on('data', function (chunk) {
  console.log(chunk.toString());
});

It doesn't directly answer your question but it's a way to get a working code.

like image 124
Shanoor Avatar answered Oct 20 '22 23:10

Shanoor