Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

What's the proper way to handle back-pressure in a node.js Transform stream?

##Intro These are my first adventures in writing the node.js server side. It's been fun so far but I'm having some difficulty understanding the proper way to implement something as it relates to node.js streams.

###Problem For test and learning purposes I'm working with large files whose content is zlib compressed. The compressed content is binary data, each packet being 38 bytes in length. I'm trying to create a resulting file that looks almost identical to the original file except that there is an uncompressed 31-byte header for every 1024 38-byte packets.

###original file content (decompressed)

+----------+----------+----------+----------+
| packet 1 | packet 2 |  ......  | packet N |
| 38 bytes | 38 bytes |  ......  | 38 bytes |
+----------+----------+----------+----------+

###resulting file content

+----------+--------------------------------+----------+--------------------------------+
| header 1 |    1024 38 byte packets        | header 2 |    1024 38 byte packets        |
| 31 bytes |       zlib compressed          | 31 bytes |       zlib compressed          |
+----------+--------------------------------+----------+--------------------------------+

As you can see, it's somewhat of a translation problem. This means, I'm taking some source stream as input and then slightly transforming it into some output stream. Therefore, it felt natural to implement a Transform stream.

The class simply attempts to accomplish the following:

  1. Takes stream as input
  2. zlib inflates the chunks of data to count the number of packets, putting together 1024 of them, zlib deflating, and prepending a header.
  3. Passes the new resulting chunk on through the pipeline via this.push(chunk).

A use case would be something like:

var fs = require('fs');
var me = require('./me'); // Where my Transform stream code sits
var inp = fs.createReadStream('depth_1000000');
var out = fs.createWriteStream('depth_1000000.out');
inp.pipe(me.createMyTranslate()).pipe(out);

###Question(s) Assuming Transform is a good choice for this use case, I seem to be running into a possible back-pressure issue. My call to this.push(chunk) within _transform keeps returning false. Why would this be and how to handle such things?

like image 403
Scott Saad Avatar asked Dec 25 '13 04:12

Scott Saad


5 Answers

This question from 2013 is all I was able to find on how to deal with "back pressure" when creating node Transform streams.

From the node 7.10.0 Transform stream and Readable stream documentation what I gathered was that once push returned false, nothing else should be pushed until _read was called.

The Transform documentation doesn't mention _read except to mention that the base Transform class implements it (and _write). I found the information about push returning false and _read being called in the Readable stream documentation.

The only other authoritative comment I found on Transform back pressure only mentioned it as an issue, and that was in a comment at the top of the node file _stream_transform.js.

Here's the section about back pressure from that comment:

// This way, back-pressure is actually determined by the reading side,
// since _read has to be called to start processing a new chunk.  However,
// a pathological inflate type of transform can cause excessive buffering
// here.  For example, imagine a stream where every byte of input is
// interpreted as an integer from 0-255, and then results in that many
// bytes of output.  Writing the 4 bytes {ff,ff,ff,ff} would result in
// 1kb of data being output.  In this case, you could write a very small
// amount of input, and end up with a very large amount of output.  In
// such a pathological inflating mechanism, there'd be no way to tell
// the system to stop doing the transform.  A single 4MB write could
// cause the system to run out of memory.
//
// However, even in such a pathological case, only a single written chunk
// would be consumed, and then the rest would wait (un-transformed) until
// the results of the previous transformed chunk were consumed.

Solution example

Here's the solution I pieced together to handle the back pressure in a Transform stream which I'm pretty sure works. (I haven't written any real tests, which would require writing a Writable stream to control the back pressure.)

This is a rudimentary Line transform which needs work as a line transform but does demonstrate handling the "back pressure".

const stream = require('stream');

class LineTransform extends stream.Transform
{
    constructor(options)
    {
        super(options);

        this._lastLine = "";
        this._continueTransform = null;
        this._transforming = false;
        this._debugTransformCallCount = 0;
    }

    _transform(chunk, encoding, callback)
    {
        if (encoding === "buffer")
            return callback(new Error("Buffer chunks not supported"));

        if (this._continueTransform !== null)
            return callback(new Error("_transform called before previous transform has completed."));

        // DEBUG: Uncomment for debugging help to see what's going on
        //console.error(`${++this._debugTransformCallCount} _transform called:`);

        // Guard (so we don't call _continueTransform from _read while it is being
        // invoked from _transform)
        this._transforming = true;

        // Do our transforming (in this case splitting the big chunk into lines)
        let lines = (this._lastLine + chunk).split(/\r\n|\n/);
        this._lastLine = lines.pop();

        // In order to respond to "back pressure" create a function
        // that will push all of the lines stopping when push returns false,
        // and then resume where it left off when called again, only calling
        // the "callback" once all lines from this transform have been pushed.
        // Resuming (until done) will be done by _read().
        let nextLine = 0;
        this._continueTransform = () =>
            {
                let backpressure = false;
                while (nextLine < lines.length)
                {

                    if (!this.push(lines[nextLine++] + "\n"))
                    {
                        // we've got more to push, but we got backpressure so it has to wait.
                        if (backpressure)
                            return;

                        backpressure = !this.push(lines[nextLine++] + "\n");
                    }
                }

                // DEBUG: Uncomment for debugging help to see what's going on
                //console.error(`_continueTransform ${this._debugTransformCallCount} finished\n`);

                // All lines are pushed, remove this function from the LineTransform instance
                this._continueTransform = null;
                return callback();
            };

        // Start pushing the lines
        this._continueTransform();

        // Turn off guard allowing _read to continue the transform pushes if needed.
        this._transforming = false;
    }

    _flush(callback)
    {
        if (this._lastLine.length > 0)
        {
            this.push(this._lastLine);
            this._lastLine = "";
        }

        return callback();
    }

    _read(size)
    {
        // DEBUG: Uncomment for debugging help to see what's going on
        //if (this._transforming)
        //    console.error(`_read called during _transform ${this._debugTransformCallCount}`);

        // If a transform has not pushed every line yet, continue that transform
        // otherwise just let the base class implementation do its thing.
        if (!this._transforming && this._continueTransform !== null)
            this._continueTransform();
        else
            super._read(size);
    }
}

I tested the above by running it with the DEBUG lines uncommented on a ~10000 line ~200KB file. Redirect stdout or stderr to a file (or both) to separate the debugging statements from the expected output. (node test.js > out.log 2> err.log)

const fs = require('fs');
let inStrm = fs.createReadStream("testdata/largefile.txt", { encoding: "utf8" });
let lineStrm = new LineTransform({ encoding: "utf8", decodeStrings: false });
inStrm.pipe(lineStrm).pipe(process.stdout);

Helpful debugging hint

While writing this initially I didn't realize that _read could be called before _transform returned, so I hadn't implemented the this._transforming guard and I was getting the following error:

Error: no writecb in Transform class
    at afterTransform (_stream_transform.js:71:33)
    at TransformState.afterTransform (_stream_transform.js:54:12)
    at LineTransform._continueTransform (/userdata/mjl/Projects/personal/srt-shift/dist/textfilelines.js:44:13)
    at LineTransform._transform (/userdata/mjl/Projects/personal/srt-shift/dist/textfilelines.js:46:21)
    at LineTransform.Transform._read (_stream_transform.js:167:10)
    at LineTransform._read (/userdata/mjl/Projects/personal/srt-shift/dist/textfilelines.js:56:15)
    at LineTransform.Transform._write (_stream_transform.js:155:12)
    at doWrite (_stream_writable.js:331:12)
    at writeOrBuffer (_stream_writable.js:317:5)
    at LineTransform.Writable.write (_stream_writable.js:243:11)

Looking at the node implementation I realized that this error meant that the callback given to _transform was being called more than once. There wasn't much information to be found about this error either so I thought I'd include what I figured out here.

like image 91
Mike Lippert Avatar answered Oct 05 '22 21:10

Mike Lippert


I think Transform is suitable for this, but I would perform the inflate as a separate step in the pipeline.

Here's a quick and largely untested example:

var zlib        = require('zlib');
var stream      = require('stream');
var transformer = new stream.Transform();

// Properties used to keep internal state of transformer.
transformer._buffers    = [];
transformer._inputSize  = 0;
transformer._targetSize = 1024 * 38;

// Dump one 'output packet'
transformer._dump       = function(done) {
  // concatenate buffers and convert to binary string
  var buffer = Buffer.concat(this._buffers).toString('binary');

  // Take first 1024 packets.
  var packetBuffer = buffer.substring(0, this._targetSize);

  // Keep the rest and reset counter.
  this._buffers   = [ new Buffer(buffer.substring(this._targetSize)) ];
  this._inputSize = this._buffers[0].length;

  // output header
  this.push('HELLO WORLD');

  // output compressed packet buffer
  zlib.deflate(packetBuffer, function(err, compressed) {
    // TODO: handle `err`
    this.push(compressed);
    if (done) {
      done();
    }
  }.bind(this));
};

// Main transformer logic: buffer chunks and dump them once the
// target size has been met.
transformer._transform  = function(chunk, encoding, done) {
  this._buffers.push(chunk);
  this._inputSize += chunk.length;

  if (this._inputSize >= this._targetSize) {
    this._dump(done);
  } else {
    done();
  }
};

// Flush any remaining buffers.
transformer._flush = function() {
  this._dump();
};

// Example:
var fs = require('fs');
fs.createReadStream('depth_1000000')
  .pipe(zlib.createInflate())
  .pipe(transformer)
  .pipe(fs.createWriteStream('depth_1000000.out'));
like image 38
robertklep Avatar answered Oct 05 '22 23:10

robertklep


push will return false if the stream you are writing to (in this case, a file output stream) has too much data buffered. Since you're writing to disk, this makes sense: you are processing data faster than you can write it out.

When out's buffer is full, your transform stream will fail to push, and start buffering data itself. If that buffer should fill, then inp's will start to fill. This is how things should be working. The piped streams are only going to process data as fast as the slowest link in the chain can handle it (once your buffers are full).

like image 38
mako-taco Avatar answered Oct 05 '22 21:10

mako-taco


Ran into a similar problem lately, needing to handle backpressure in an inflating transform stream - the secret to handling push() returning false is to register and handle the 'drain' event on the stream

_transform(data, enc, callback) {
  const continueTransforming = () => {
    // ... do some work / parse the data, keep state of where we're at etc
    if(!this.push(event)) 
         this._readableState.pipes.once('drain', continueTransforming); // will get called again when the reader can consume more data
    if(allDone)
       callback();
  }
  continueTransforming()
}

NOTE this is a bit hacky as we're reaching into the internals and pipes can even be an array of Readables but it does work in the common case of ....pipe(transform).pipe(...

Would be great if someone from the Node community can suggest a "correct" method for handling .push() returning false

like image 41
Ledion Avatar answered Oct 05 '22 23:10

Ledion


I ended up following Ledion's example and created a utility Transform class which assists with backpressure. The utility adds an async method named addData, which the implementing Transform can await.

'use strict';

const { Transform } = require('stream');

/**
 * The BackPressureTransform class adds a utility method addData which
 * allows for pushing data to the Readable, while honoring back-pressure.
 */
class BackPressureTransform extends Transform {
  constructor(...args) {
    super(...args);
  }

  /**
   * Asynchronously add a chunk of data to the output, honoring back-pressure.
   *
   * @param {String} data
   * The chunk of data to add to the output.
   *
   * @returns {Promise<void>}
   * A Promise resolving after the data has been added.
   */
  async addData(data) {
    // if .push() returns false, it means that the readable buffer is full
    // when this occurs, we must wait for the internal readable to emit
    // the 'drain' event, signalling the readable is ready for more data
    if (!this.push(data)) {
      await new Promise((resolve, reject) => {
        const errorHandler = error => {
          this.emit('error', error);
          reject();
        };
        const boundErrorHandler = errorHandler.bind(this);

        this._readableState.pipes.on('error', boundErrorHandler);
        this._readableState.pipes.once('drain', () => {
          this._readableState.pipes.removeListener('error', boundErrorHandler);
          resolve();
        });
      });
    }
  }
}

module.exports = {
  BackPressureTransform
};

Using this utility class, my Transforms look like this now:

'use strict';

const { BackPressureTransform } = require('./back-pressure-transform');

/**
 * The Formatter class accepts the transformed row to be added to the output file.
 * The class provides generic support for formatting the result file.
 */
class Formatter extends BackPressureTransform {
  constructor() {
    super({
      encoding: 'utf8',
      readableObjectMode: false,
      writableObjectMode: true
    });

    this.anyObjectsWritten = false;
  }

  /**
   * Called when the data pipeline is complete.
   *
   * @param {Function} callback
   * The function which is called when final processing is complete.
   *
   * @returns {Promise<void>}
   * A Promise resolving after the flush completes.
   */
  async _flush(callback) {
    // if any object is added, close the surrounding array
    if (this.anyObjectsWritten) {
      await this.addData('\n]');
    }

    callback(null);
  }

  /**
   * Given the transformed row from the ETL, format it to the desired layout.
   *
   * @param {Object} sourceRow
   * The transformed row from the ETL.
   *
   * @param {String} encoding
   * Ignored in object mode.
   *
   * @param {Function} callback
   * The callback function which is called when the formatting is complete.
   *
   * @returns {Promise<void>}
   * A Promise resolving after the row is transformed.
   */
  async _transform(sourceRow, encoding, callback) {
    // before the first object is added, surround the data as an array
    // between each object, add a comma separator
    await this.addData(this.anyObjectsWritten ? ',\n' : '[\n');

    // update state
    this.anyObjectsWritten = true;

    // add the object to the output
    const parsed = JSON.stringify(sourceRow, null, 2).split('\n');
    for (const [index, row] of parsed.entries()) {
      // prepend the row with 2 additional spaces since we're inside a larger array
      await this.addData(`  ${row}`);

      // add line breaks except for the last row
      if (index < parsed.length - 1) {
        await this.addData('\n');
      }
    }

    callback(null);
  }
}

module.exports = {
  Formatter
};
like image 32
Michael P. Scott Avatar answered Oct 05 '22 22:10

Michael P. Scott