It seems to me that an elegant way to process certain kinds of data in Node.js would be to chain processing objects, like UNIX pipes.
For example, grep:
function Grep(pattern) {
...
}
util.inherits(Grep, stream.Stream);
Grep.prototype.???? = ??????? // What goes here?
grep = new Grep(/foo/);
process.stdin.pipe(grep);
myStream.pipe(process.stdout);
However it's not at all clear to me how the various Stream methods need to be overridden in order for this to work.
How can I create a Stream object that simply copies from its input to its output? Presumably with that answered, more sophisticated filtering streams become trivial.
Update: it feels as if the following should work (expressed in CoffeeScript, so I don't fill this box with JS syntax!):
class Forwarder extends stream.Stream
write: (chunk, encoding) ->
@emit 'data', chunk
end: (chunk, encoding) =>
if chunk?
@emit 'data', chunk
@emit 'end'
fwd = new Forwarder()
fwd.pipe(process.stdout);
process.stdin.pipe(fwd);
process.stdin.resume();
However catting something to this script doesn't output anything. Calling fwd.write()
explicitly in the script does cause output on stdout
.
To implement a readable stream, we require the Readable interface, and construct an object from it, and implement a read() method in the stream's configuration parameter: const { Readable } = require('stream'); const inStream = new Readable({ read() {} }); There is a simple way to implement readable streams.
The Stream module is a native module that shipped by default in Node. js. The Stream is an instance of the EventEmitter class which handles events asynchronously in Node. Because of this, streams are inherently event-based.
PassThrough. This Stream is a trivial implementation of a Transform stream that simply passes the input bytes across to the output. This is mainly for testing and some other trivial use cases. Here is an example of Passthrough Stream where it is piping from readable stream to writable stream.
libuv is a multi-platform C library that provides support for asynchronous I/O based on event loops. It supports epoll(4) , kqueue(2) , Windows IOCP, and Solaris event ports. It is primarily designed for use in Node. js but it is also used by other software projects.
You are so very close.
Because you are using the very low-level stream class, you need to set the stream writable property to make it a writable stream. If you were reading from the stream, you'd need to set the readable property. Also the end event doesn't have any arguments.
class Forwarder extends stream.Stream
constructor: ->
@writable = true
write: (chunk, encoding) ->
@emit 'data', chunk
end: ->
@emit 'end'
fwd = new Forwarder()
fwd.pipe(process.stdout);
process.stdin.pipe(fwd);
process.stdin.resume();
The answer above applied to V1 streams in Node <= 0.8. If you are using > 0.8, Node has added more specific classes that are designed to be extended, so you would use something more like this:
class Forwarder extends stream.Transform
_transform: (chunk, encoding, callback) ->
this.push(chunk);
callback();
Processing chunk
and pushing the pieces you actually want.
Although the answer that exists is nice, it still requires a bit of digging around on behalf of those looking for answers.
The following code completes the example the OP gave, using the Node 0.10 stream API.
var stream = require('stream')
var util = require('util')
function Grep(pattern) {
stream.Transform.call(this)
this.pattern = pattern
}
util.inherits(Grep, stream.Transform)
Grep.prototype._transform = function(chunk, encoding, callback) {
var string = chunk.toString()
if (string.match(this.pattern)) {
this.push(chunk)
}
callback()
}
var grep = new Grep(/foo/)
process.stdin.pipe(grep)
grep.pipe(process.stdout)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With