Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Node.js Streams Readable to Transform

I have been trying to use a readable and a transform stream to process a very large file. The problem that I seem to come across is that if I don't put a writable stream at the end, the program seems to terminate before the result gets returned.

Example : rstream.pipe(split()).pipe(tstream)

My tstream has an emitter that emits when a counter hits a threshold. When that threshold is set to a low number, I get a result, but when it's high, it's not returning anything. If I pipe it to a file writer, it always returns a result. Am I missing something obvious?

code:

// Dependencies
var fs = require('fs');
var rstream = fs.createReadStream('file');
var wstream = fs.createWriteStream('output');
var split = require('split'); // used for separating stream by new line
var QTransformStream = require('./transform');

var qtransformstream = new QTransformStream();
qtransformstream.on('completed', function(result) {
    console.log('Result: ' + result);
});
exports.getQ = function getQ(filename, callback) {

    // THIS WORKS if i have a low counter for qtransformstream, 
    // but when it's high, I do not get a result
    //   rstream.pipe(split()).pipe(qtransformstream);

    // this always works
    rstream.pipe(split()).pipe(qtransformstream).pipe(wstream);

};

Here is the code for the Qtransformstream

// Dependencies
var Transform = require('stream').Transform,
    util = require('util');
// Constructor, takes in the Quser as an input
var TransformStream = function(Quser) {
    // Create this as a Transform Stream
    Transform.call(this, {
        objectMode: true
    });
    // Default the Qbase to 32 as an assumption
    this.Qbase = 32;
    if (Quser) {
        this.Quser = Quser;
    } else {
        this.Quser = 20;
    }
    this.Qpass = this.Quser + this.Qbase;
    this.Counter = 0;
    // Variables used as intermediates
    this.Qmin = 120;
    this.Qmax = 0;
};
// Extend the transform object
util.inherits(TransformStream, Transform);
// The Transformation to get the Qbase and Qpass
TransformStream.prototype._transform = function(chunk, encoding, callback) {
    var Qmin = this.Qmin;
    var Qmax = this.Qmax;
    var Qbase = this.Qbase;
    var Quser = this.Quser;
    this.Counter++;
    // Stop the stream after 100 reads and emit the data
    if (this.Counter === 100) {
        this.emit('completed', this.Qbase, this.Quser);
    }
    // do some calcs on this.Qbase

    this.push('something not important');
    callback();
};
// export the object
module.exports = TransformStream;
like image 823
ace040686 Avatar asked Aug 04 '15 15:08

ace040686


People also ask

What is readable stream in node js?

There are four fundamental stream types in Node. js: Readable, Writable, Duplex, and Transform streams. A readable stream is an abstraction for a source from which data can be consumed. An example of that is the fs. createReadStream method.

Is transform a nodejs stream?

Transform streams have both readable and writable features. It allows the processing of input data followed by outputting data in the processed format. To create a transform stream, we need to import the Transform class from the Node. js stream module.

What is readable stream?

Readable: streams from which data can be read. For example: fs. createReadStream() lets us read the contents of a file. Duplex: streams that are both Readable and Writable. For example, net.

How do you switch between modes in readable stream mode?

One of the ways of switching the mode of a stream to flowing is to attach a 'data' event listener. A way to switch the readable stream to a flowing mode manually is to call the stream. resume method.


3 Answers

EDIT:

Also, I don't know how high your counter goes but if you fill up the buffer it will stop passing data to the transform stream in which case completed is never actually hit because you never get to the counter limit. Try changing your highwatermark.

EDIT 2: A Little Better Explanation

As you well know a transform stream is a duplex stream which basically means it can accept data from a source, and it can send data to a destination. This is commonly referred to as reading and writing respectively. The transform stream inherits from both the read stream and the write stream implemented by Node.js. There is one caveat though, the transform stream does not have to implement the _read or _write functions. In that sense you can kind of think of it as the lesser known passthrough stream.

If you think about the fact that the transform stream implements the write stream you must also think about the fact that the write stream always has a destination to dump its contents. The problem you are having is that when you create a transform stream you can't specify a place to send your contents. The only way to pass data completely through your transform stream is to pipe it to a write stream, otherwise, in essence your streams get backed up and can't accept more data, because there is no place for the data to go.

This is why when you are piping to a write stream it always works. The write stream is alleviating the data backup by sending the data to a destination, so all of your data will be piped through and the event of complete will be emitted.

The reason that your code is working without the write stream when the sample size is small is that you aren't filling up your stream, so the transform stream can accept enough data to allow the complete event/threshold to be hit. As the threshold increases the amount of data your stream can accept without sending it to another place (a write stream) stays the same. This causes your stream to get backed up, and it can no longer accept data, which means that the completed event will never be emitted.

I would venture to say that if you increase your highwatermark for the transform stream you will be able to increase your threshold and still have the code work. This method is incorrect though. Pipe your stream to a write stream that will send the data to dev/null the way to creat that write stream is:

var writer = fs.createWriteStream('/dev/null');

The section in the Node.js docs on buffering explain the error you are running into.

like image 61
RadleyMith Avatar answered Oct 18 '22 20:10

RadleyMith


You don't interrupt _transform and process goes far far away. Try:

this.emit('completed', ...);
this.end();

That's why 'program seems to terminate before the result gets returned'

And don't output any useless data:

var wstream = fs.createWriteStream('/dev/null');

Good luck)

like image 1
Mi Ke Bu Avatar answered Oct 18 '22 21:10

Mi Ke Bu


I would suggest to use a Writable rather than a Transform stream. Then rename _transform to _write and your code will consume the stream if you pipe to it. A transform stream, as @Bradgnar already pointed out, needs a consumer or it will stop the readable stream from pushing more data to its buffer.

like image 1
windm Avatar answered Oct 18 '22 22:10

windm