Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Proper way to unpipe a streams2 pipeline and empty it (not just flush)

Premise

I'm trying to find the correct way to prematurely terminate a series of piped streams (pipeline) in Node.js: sometimes I want to gracefully abort the stream before it has finished. Specifically I'm dealing with mostly objectMode: true and non-native parallel streams, but this shouldn't really matter.

Problem

The problem is when I unpipe the pipeline, data remains in each stream's buffer and is drained. This might be okay for most of the intermediate streams (e.g. Readable/Transform), but the last Writable still drains to its write target (e.g. a file or a database or socket or w/e). This could be problematic if the buffer contains hundreds or thousands of chunks which takes a significant amount of time to drain. I want it to stop immediately, i.e. not drain; why waste cycles and memory on data that doesn't matter?

Depending on the route I go, I receive either a "write after end" error, or an exception when the stream cannot find existing pipes.

Question

What is the proper way to gracefully kill off a pipeline of streams in the form a.pipe(b).pipe(c).pipe(z)?

Solution?

The solution I have come up with is 3-step:

  1. unpipe each stream in the pipeline in reverse order
  2. Empty each stream's buffer that implements Writable
  3. end each stream that implements Writable

Some pseudo code illustrating the entire process:

var pipeline = [ // define the pipeline
  readStream,
  transformStream0,
  transformStream1,
  writeStream
];

// build and start the pipeline
var tmpBuildStream;
pipeline.forEach(function(stream) {
    if ( !tmpBuildStream ) {
        tmpBuildStream = stream;
        continue;
    }
    tmpBuildStream = lastStream.pipe(stream);
});

// sleep, timeout, event, etc...

// tear down the pipeline
var tmpTearStream;
pipeline.slice(0).reverse().forEach(function(stream) {
    if ( !tmpTearStream ) {
        tmpTearStream = stream;
        continue;
    }
    tmpTearStream = stream.unpipe(tmpTearStream);
});

// empty and end the pipeline
pipeline.forEach(function(stream) {
  if ( typeof stream._writableState === 'object' ) { // empty
    stream._writableState.length -= stream._writableState.buffer.length;
    stream._writableState.buffer = [];
  }
  if ( typeof stream.end === 'function' ) { // kill
    stream.end();
  }
});

I'm really worried about the usage of stream._writableState and modifying the internal buffer and length properties (the _ signifies a private property). This seems like a hack. Also note that since I'm piping, things like pause and resume our out of the question (based on a suggestion I received from IRC).

I also put together a runnable version (pretty sloppy) you can grab from github: https://github.com/zamnuts/multipipe-proto (git clone, npm install, view readme, npm start)

like image 421
zamnuts Avatar asked Nov 06 '14 02:11

zamnuts


1 Answers

In this particular case I think we should get rid of the structure where you have 4 different not fully customised streams. Piping them together will create chain dependency that will be hard to control if we haven't implement our own mechanism.

I would like to focus on your actuall goal here:

 INPUT >----[read] → [transform0] → [transform1] → [write]-----> OUTPUT
               |          |              |            |
 KILL_ALL------o----------o--------------o------------o--------[nothing to drain]

I believe that the above structure can be achieved via combining custom:

  1. duplex stream - for own _write(chunk, encoding, cb)and _read(bytes) implementation with

  2. transform stream - for own _transform(chunk, encoding, cb) implementation.

Since you are using the writable-stream-parallel package you may also want to go over their libs, as their duplex implementation can be found here: https://github.com/Clever/writable-stream-parallel/blob/master/lib/duplex.js . And their transform stream implementation is here: https://github.com/Clever/writable-stream-parallel/blob/master/lib/transform.js. Here they handle the highWaterMark.

Possible solution

Their write stream : https://github.com/Clever/writable-stream-parallel/blob/master/lib/writable.js#L189 has an interesting function writeOrBuffer, I think you might be able to tweak it a bit to interrupt writing the data from buffer.

Note: These 3 flags are controlling the buffer clearing:

( !finished && !state.bufferProcessing && state.buffer.length )

References:

  • Node.js Transform Stream Doc
  • Node.js Duplex Stream Doc
  • Writing Transform Stream in Node.js
  • Writing Duplex Stream in Node.js
like image 124
Piotr Dajlido Avatar answered Nov 15 '22 14:11

Piotr Dajlido