Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

What is a correct way to pause piped readable stream from writeable one in nodejs?

I am writing a module, which is a writeable stream. I want to implement pipe interface for my users.

If some error happens, i need to pause readable stream and emit error event. Then, user will decide - if he is ok with error, he should be able to resume to data processing.

var writeable = new BackPressureStream();
writeable.on('error', function(error){
    console.log(error);
    writeable.resume();
});

var readable = require('fs').createReadStream('somefile.txt');
readable.pipe.(writeable);

I see that node provides us with readable.pause() method, that can be used to pause readable stream. But i can't get how i can call it from my writeable stream module:

var Writable = require('stream').Writable;

function BackPressureStream(options) {
    Writable.call(this, options);
}
require('util').inherits(BackPressureStream, Writable);

BackPressureStream.prototype._write = function(chunk, encoding, done) {
    done();
};

BackPressureStream.prototype.resume = function() {
    this.emit('drain');
}

How back pressure can be implemented in a writeable stream?

P.S. It is possible to use pipe/unpipe events, that provide readable stream as a parameter. But it is also said, that for piped streams, the only chance to pause is to unpipe readable stream from writeable.

Did i got it right? I have to unpipe my writeable stream until user calls resume? And after user calls resume, i should pipe readable stream back?

like image 885
avasin Avatar asked Oct 28 '15 08:10

avasin


People also ask

What is the correct way to pipe a readable stream and a writable stream Nodejs?

To consume a readable stream, we can use the pipe / unpipe methods, or the read / unshift / resume methods. To consume a writable stream, we can make it the destination of pipe / unpipe , or just write to it with the write method and call the end method when we're done.

How do you stop a readable stream in node JS?

It can be done by switching the stream into the flowing mode, or by calling stream. read() method again and again until all the data is being consumed. console.

How do I pause node JS?

Use the await() Keyword to Pause Execution of Codes in Node. js.

What is a writable stream in Nodejs?

Writable : streams to which data can be written (for example, fs. createWriteStream() ). Readable : streams from which data can be read (for example, fs. createReadStream() ). Duplex : streams that are both Readable and Writable (for example, net.


1 Answers

What you are describing is already implemented by the pipe method itself. From Errors While Writing section in the docs:

If a Readable stream pipes into a Writable stream when Writable emits an error, the Readable stream will be unpiped.

So, as an implementer of a writable stream, your only job is to implement the _write method and emit an error when it happens. Unpiping will be handled automatically by the Stream module. And then, it is the job of the consumers of your module to pipe readable stream back if they consider an error to be non-critical. Here is how they could do that:

var writeable = new BackPressureStream();
var readable = require('fs').createReadStream('somefile.txt');

writeable.on('error', function(error) {
    // use pipe again, if error is not critical
    if (!error.critical) {
        readable.pipe(writeable);
    } else {
        readable.destroy(error);
    }
});

readable.pipe(writeable);

And inside your module:

BackPressureStream.prototype._write = function(chunk, encoding, done) {
    // call done with an error to emit 'error' event and unpipe readable stream
    done(new Error('BOOM'));
};
like image 122
danroshko Avatar answered Sep 21 '22 21:09

danroshko