Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Transform stream to prepend string to each line

I spawn a child process like so:

const n = cp.spawn('bash');

n.stdout.pipe(process.stdout);
n.stderr.pipe(process.stderr);

I am looking for a transform stream so that I can prepend something like '[child process]' to the beginning of each line from the child, so I know that the stdio is coming from the child versus the parent process.

So it would look like:

const getTransformPrepender = function() : Transform {
   return ...
}

n.stdout.pipe(getTransformPrepender('[child]')).pipe(process.stdout);
n.stderr.pipe(getTransformPrepender('[child]')).pipe(process.stderr);

does anyone know if there is an existing transform package like this or how to write one?

I have this:

import * as stream from 'stream';


export default function(pre: string){

  let saved = '';

  return new stream.Transform({

    transform(chunk, encoding, cb) {

      cb(null, String(pre) + String(chunk));
    },

    flush(cb) {
      this.push(saved);
      cb();
    }

  });

}

but I am afraid it won't work in edge cases - where one chunk burst may not comprise an entire line (for very long lines).

It looks like the answer to this is here: https://strongloop.com/strongblog/practical-examples-of-the-new-node-js-streams-api/

but with this addendum: https://twitter.com/the1mills/status/886340747275812865

like image 392
Alexander Mills Avatar asked Jun 20 '17 22:06

Alexander Mills


2 Answers

There are in total three cases that you need to correctly handle:

  • A single chunk representing an entire line
  • A single chunk representing multiple lines
  • A single chunk representing only part of the line

Here is an algorithm description to solve all three situations

  1. Receive a chunk of data
  2. Scan the chunk for newlines
  3. As soon as a newline is found, take everything before it (including the newline) and send it out as a single line entry with any modifications you need
  4. Repeat until the whole chunk has been processed (no remaining data) or until no additional newlines have been found (some data remains, save it for later)

And here is an actual implementation with descriptions of why it is needed etc.

Please note that for performance reasons, I am not converting the Buffers into classic JS strings.

const { Transform } = require('stream')

const prefix = Buffer.from('[worker]: ')

const prepender = new Transform({
  transform(chunk, encoding, done) {
    this._rest = this._rest && this._rest.length
      ? Buffer.concat([this._rest, chunk])
      : chunk

    let index

    // As long as we keep finding newlines, keep making slices of the buffer and push them to the
    // readable side of the transform stream
    while ((index = this._rest.indexOf('\n')) !== -1) {
      // The `end` parameter is non-inclusive, so increase it to include the newline we found
      const line = this._rest.slice(0, ++index)
      // `start` is inclusive, but we are already one char ahead of the newline -> all good
      this._rest = this._rest.slice(index)
      // We have a single line here! Prepend the string we want
      this.push(Buffer.concat([prefix, line]))
    }

    return void done()
  },

  // Called before the end of the input so we can handle any remaining 
  // data that we have saved
  flush(done) {
    // If we have any remaining data in the cache, send it out
    if (this._rest && this._rest.length) {
      return void done(null, Buffer.concat([prefix, this._rest])
    }
  },
})

process.stdin.pipe(prepender).pipe(process.stdout)
like image 100
Robert Rossmann Avatar answered Oct 01 '22 18:10

Robert Rossmann


You can prepend to a stream using:

https://github.com/ORESoftware/prepend-transform

but it's designed to solve the problem at hand like so:

import pt from 'prepend-transform';
import * as cp from 'child_process';

const n = cp.spawn('bash');

n.stdout.pipe(pt('child stdout: ')).pipe(process.stdout);
n.stderr.pipe(pt('child stderr: ')).pipe(process.stderr);
like image 21
Alexander Mills Avatar answered Oct 01 '22 16:10

Alexander Mills