I want to pipe data from an amazon kinesis stream to a an s3 log or a bunyan log.
The sample works with a file write stream or stdout. How would I implmeny my own writable stream?
//this works var file = fs.createWriteStream('my.log') kinesisSource.pipe(file)
this doesn't work saying it has no method 'on'
var stream = {}; //process.stdout works however stream.writable = true; stream.write =function(data){ console.log(data); }; kinesisSource.pipe(stream);
what methods do I have to implement for my own custom writable stream, the docs seem to indicate I need to implement 'write' and not 'on'
To write data to a writable stream you need to call write() on the stream instance. Like in the following example: var fs = require('fs'); var readableStream = fs. createReadStream('file1.
The pipe event in a Writable Stream is emitted when the stream. pipe() method is being called on a readable stream by attaching this writable to its set of destinations. Return Value: If the pipe() method is being called then this event is emitted else it is not emitted.
A writable stream is an abstraction for a destination to which data can be written. An example of that is the fs. createWriteStream method. A duplex streams is both Readable and Writable. An example of that is a TCP socket.
The WritableStream interface of the Streams API provides a standard abstraction for writing streaming data to a destination, known as a sink. This object comes with built-in backpressure and queuing. WritableStream is a transferable object.
To create your own writable stream, you have three possibilities.
For this you'll need 1) to extend the Writable class 2) to call the Writable constructor in your own constructor 3) define a _write()
method in the prototype of your stream object.
Here's an example :
var stream = require('stream'); var util = require('util'); function EchoStream () { // step 2 stream.Writable.call(this); }; util.inherits(EchoStream, stream.Writable); // step 1 EchoStream.prototype._write = function (chunk, encoding, done) { // step 3 console.log(chunk.toString()); done(); } var myStream = new EchoStream(); // instanciate your brand new stream process.stdin.pipe(myStream);
Instead of defining a new object type, you can instanciate an empty Writable
object and implement the _write()
method:
var stream = require('stream'); var echoStream = new stream.Writable(); echoStream._write = function (chunk, encoding, done) { console.log(chunk.toString()); done(); }; process.stdin.pipe(echoStream);
If you're using io.js, you can use the simplified constructor API:
var writable = new stream.Writable({ write: function(chunk, encoding, next) { console.log(chunk.toString()); next(); } });
class EchoStream extends stream.Writable { _write(chunk, enc, next) { console.log(chunk.toString()); next(); } }
Actually to create a writeable stream is quite simple. Here's is the example:
var fs = require('fs'); var Stream = require('stream'); var ws = new Stream; ws.writable = true; ws.bytes = 0; ws.write = function(buf) { ws.bytes += buf.length; } ws.end = function(buf) { if(arguments.length) ws.write(buf); ws.writable = false; console.log('bytes length: ' + ws.bytes); } fs.createReadStream('file path').pipe(ws);
Also if you want to create your own class, @Paul give a good answer.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With