I am writing a module, which is a writeable stream. I want to implement pipe interface for my users.
If some error happens, i need to pause readable stream and emit error event. Then, user will decide - if he is ok with error, he should be able to resume to data processing.
var writeable = new BackPressureStream();
writeable.on('error', function(error){
console.log(error);
writeable.resume();
});
var readable = require('fs').createReadStream('somefile.txt');
readable.pipe.(writeable);
I see that node provides us with readable.pause()
method, that can be used to pause readable stream. But i can't get how i can call it from my writeable stream module:
var Writable = require('stream').Writable;
function BackPressureStream(options) {
Writable.call(this, options);
}
require('util').inherits(BackPressureStream, Writable);
BackPressureStream.prototype._write = function(chunk, encoding, done) {
done();
};
BackPressureStream.prototype.resume = function() {
this.emit('drain');
}
How back pressure can be implemented in a writeable stream?
P.S. It is possible to use pipe/unpipe
events, that provide readable stream as a parameter. But it is also said, that for piped streams, the only chance to pause is to unpipe readable stream from writeable.
Did i got it right? I have to unpipe my writeable stream until user calls resume? And after user calls resume, i should pipe readable stream back?
To consume a readable stream, we can use the pipe / unpipe methods, or the read / unshift / resume methods. To consume a writable stream, we can make it the destination of pipe / unpipe , or just write to it with the write method and call the end method when we're done.
It can be done by switching the stream into the flowing mode, or by calling stream. read() method again and again until all the data is being consumed. console.
Use the await() Keyword to Pause Execution of Codes in Node. js.
Writable : streams to which data can be written (for example, fs. createWriteStream() ). Readable : streams from which data can be read (for example, fs. createReadStream() ). Duplex : streams that are both Readable and Writable (for example, net.
What you are describing is already implemented by the pipe
method itself. From Errors While Writing section in the docs:
If a
Readable
stream pipes into aWritable
stream whenWritable
emits an error, theReadable
stream will be unpiped.
So, as an implementer of a writable stream, your only job is to implement the _write
method and emit an error when it happens. Unpiping will be handled automatically by the Stream module. And then, it is the job of the consumers of your module to pipe readable stream back if they consider an error to be non-critical. Here is how they could do that:
var writeable = new BackPressureStream();
var readable = require('fs').createReadStream('somefile.txt');
writeable.on('error', function(error) {
// use pipe again, if error is not critical
if (!error.critical) {
readable.pipe(writeable);
} else {
readable.destroy(error);
}
});
readable.pipe(writeable);
And inside your module:
BackPressureStream.prototype._write = function(chunk, encoding, done) {
// call done with an error to emit 'error' event and unpipe readable stream
done(new Error('BOOM'));
};
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With