Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Filtering Stream object in node.js

It seems to me that an elegant way to process certain kinds of data in Node.js would be to chain processing objects, like UNIX pipes.

For example, grep:

function Grep(pattern) {
    ...
}
util.inherits(Grep, stream.Stream);

Grep.prototype.???? = ???????  // What goes here?

grep = new Grep(/foo/);

process.stdin.pipe(grep);
myStream.pipe(process.stdout);

However it's not at all clear to me how the various Stream methods need to be overridden in order for this to work.

How can I create a Stream object that simply copies from its input to its output? Presumably with that answered, more sophisticated filtering streams become trivial.

Update: it feels as if the following should work (expressed in CoffeeScript, so I don't fill this box with JS syntax!):

class Forwarder extends stream.Stream
    write: (chunk, encoding) ->
        @emit 'data', chunk
    end: (chunk, encoding) =>
        if chunk?
            @emit 'data', chunk
        @emit 'end'

fwd = new Forwarder()
fwd.pipe(process.stdout);
process.stdin.pipe(fwd);
process.stdin.resume();

However catting something to this script doesn't output anything. Calling fwd.write() explicitly in the script does cause output on stdout.

like image 262
slim Avatar asked Aug 04 '11 15:08

slim


People also ask

How do you make an object readable stream?

To implement a readable stream, we require the Readable interface, and construct an object from it, and implement a read() method in the stream's configuration parameter: const { Readable } = require('stream'); const inStream = new Readable({ read() {} }); There is a simple way to implement readable streams.

Which object is stream in node JS?

The Stream module is a native module that shipped by default in Node. js. The Stream is an instance of the EventEmitter class which handles events asynchronously in Node. Because of this, streams are inherently event-based.

What is stream PassThrough ()?

PassThrough. This Stream is a trivial implementation of a Transform stream that simply passes the input bytes across to the output. This is mainly for testing and some other trivial use cases. Here is an example of Passthrough Stream where it is piping from readable stream to writable stream.

What is Libuv in node JS?

libuv is a multi-platform C library that provides support for asynchronous I/O based on event loops. It supports epoll(4) , kqueue(2) , Windows IOCP, and Solaris event ports. It is primarily designed for use in Node. js but it is also used by other software projects.


2 Answers

You are so very close.

Because you are using the very low-level stream class, you need to set the stream writable property to make it a writable stream. If you were reading from the stream, you'd need to set the readable property. Also the end event doesn't have any arguments.

class Forwarder extends stream.Stream
  constructor: ->
    @writable = true
  write: (chunk, encoding) ->
    @emit 'data', chunk
  end: ->
    @emit 'end'

fwd = new Forwarder()
fwd.pipe(process.stdout);
process.stdin.pipe(fwd);
process.stdin.resume();

Update

The answer above applied to V1 streams in Node <= 0.8. If you are using > 0.8, Node has added more specific classes that are designed to be extended, so you would use something more like this:

class Forwarder extends stream.Transform
    _transform: (chunk, encoding, callback) ->
      this.push(chunk);
      callback();

Processing chunk and pushing the pieces you actually want.

like image 129
loganfsmyth Avatar answered Sep 24 '22 02:09

loganfsmyth


Although the answer that exists is nice, it still requires a bit of digging around on behalf of those looking for answers.

The following code completes the example the OP gave, using the Node 0.10 stream API.

var stream = require('stream')
var util = require('util')

function Grep(pattern) {
  stream.Transform.call(this)

  this.pattern = pattern
}

util.inherits(Grep, stream.Transform)

Grep.prototype._transform = function(chunk, encoding, callback) {
  var string = chunk.toString()

  if (string.match(this.pattern)) {
    this.push(chunk)
  }

  callback()
}

var grep = new Grep(/foo/)
process.stdin.pipe(grep)
grep.pipe(process.stdout)
like image 32
deceleratedcaviar Avatar answered Sep 22 '22 02:09

deceleratedcaviar