I'm trying to find the correct way to prematurely terminate a series of piped streams (pipeline) in Node.js: sometimes I want to gracefully abort the stream before it has finished. Specifically I'm dealing with mostly objectMode: true
and non-native parallel streams, but this shouldn't really matter.
The problem is when I unpipe
the pipeline, data remains in each stream's buffer and is drain
ed. This might be okay for most of the intermediate streams (e.g. Readable
/Transform
), but the last Writable
still drains to its write target (e.g. a file or a database or socket or w/e). This could be problematic if the buffer contains hundreds or thousands of chunks which takes a significant amount of time to drain. I want it to stop immediately, i.e. not drain; why waste cycles and memory on data that doesn't matter?
Depending on the route I go, I receive either a "write after end" error, or an exception when the stream cannot find existing pipes.
What is the proper way to gracefully kill off a pipeline of streams in the form a.pipe(b).pipe(c).pipe(z)
?
The solution I have come up with is 3-step:
unpipe
each stream in the pipeline in reverse orderWritable
end
each stream that implements Writable
Some pseudo code illustrating the entire process:
var pipeline = [ // define the pipeline
readStream,
transformStream0,
transformStream1,
writeStream
];
// build and start the pipeline
var tmpBuildStream;
pipeline.forEach(function(stream) {
if ( !tmpBuildStream ) {
tmpBuildStream = stream;
continue;
}
tmpBuildStream = lastStream.pipe(stream);
});
// sleep, timeout, event, etc...
// tear down the pipeline
var tmpTearStream;
pipeline.slice(0).reverse().forEach(function(stream) {
if ( !tmpTearStream ) {
tmpTearStream = stream;
continue;
}
tmpTearStream = stream.unpipe(tmpTearStream);
});
// empty and end the pipeline
pipeline.forEach(function(stream) {
if ( typeof stream._writableState === 'object' ) { // empty
stream._writableState.length -= stream._writableState.buffer.length;
stream._writableState.buffer = [];
}
if ( typeof stream.end === 'function' ) { // kill
stream.end();
}
});
I'm really worried about the usage of stream._writableState
and modifying the internal buffer
and length
properties (the _
signifies a private property). This seems like a hack. Also note that since I'm piping, things like pause
and resume
our out of the question (based on a suggestion I received from IRC).
I also put together a runnable version (pretty sloppy) you can grab from github: https://github.com/zamnuts/multipipe-proto (git clone, npm install, view readme, npm start)
In this particular case I think we should get rid of the structure where you have 4 different not fully customised streams. Piping them together will create chain dependency that will be hard to control if we haven't implement our own mechanism.
I would like to focus on your actuall goal here:
INPUT >----[read] → [transform0] → [transform1] → [write]-----> OUTPUT
| | | |
KILL_ALL------o----------o--------------o------------o--------[nothing to drain]
I believe that the above structure can be achieved via combining custom:
duplex stream
- for own _write(chunk, encoding, cb)
and _read(bytes)
implementation with
transform stream
- for own _transform(chunk, encoding, cb)
implementation.
Since you are using the
writable-stream-parallel
package you may also want to go over their libs, as theirduplex
implementation can be found here: https://github.com/Clever/writable-stream-parallel/blob/master/lib/duplex.js . And theirtransform stream
implementation is here: https://github.com/Clever/writable-stream-parallel/blob/master/lib/transform.js. Here they handle the highWaterMark.
Possible solution
Their write stream : https://github.com/Clever/writable-stream-parallel/blob/master/lib/writable.js#L189 has an interesting function writeOrBuffer
, I think you might be able to tweak it a bit to interrupt writing the data from buffer.
Note: These 3 flags are controlling the buffer clearing:
( !finished && !state.bufferProcessing && state.buffer.length )
References:
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With