I'd like to combine two Node.js streams into one by piping them, if possible. I'm using Transform streams.
In other words, I'd like my library to return myStream
for people to use. For example they could write:
process.stdin.pipe(myStream).pipe(process.stdout);
And internally I'm using a third-party vendorStream
that does some work, plugged into my own logic contained in myInternalStream
. So what's above would translate to:
process.stdin.pipe(vendorStream).pipe(myInternalStream).pipe(process.stdout);
Can I do something like that? I've tried var myStream = vendorStream.pipe(myInternalStream)
but that obviously doesn't work.
To make an analogy with bash
, let's say I want to write a program that checks if the letter h
is present in the last line of some stream (tail -n 1 | grep h
), I can create a shell script:
# myscript.sh
tail -n 1 | grep h
And then if people do:
$ printf "abc\ndef\nghi" | . myscript.sh
It just works.
This is what I have so far:
// Combine a pipe of two streams into one stream
var util = require('util')
, Transform = require('stream').Transform;
var chunks1 = [];
var stream1 = new Transform();
var soFar = '';
stream1._transform = function(chunk, encoding, done) {
chunks1.push(chunk.toString());
var pieces = (soFar + chunk).split('\n');
soFar = pieces.pop();
for (var i = 0; i < pieces.length; i++) {
var piece = pieces[i];
this.push(piece);
}
return done();
};
var chunks2 = [];
var count = 0;
var stream2 = new Transform();
stream2._transform = function(chunk, encoding, done) {
chunks2.push(chunk.toString());
count = count + 1;
this.push(count + ' ' + chunk.toString() + '\n');
done();
};
var stdin = process.stdin;
var stdout = process.stdout;
process.on('exit', function () {
console.error('chunks1: ' + JSON.stringify(chunks1));
console.error('chunks2: ' + JSON.stringify(chunks2));
});
process.stdout.on('error', process.exit);
// stdin.pipe(stream1).pipe(stream2).pipe(stdout);
// $ (printf "abc\nd"; sleep 1; printf "ef\nghi\n") | node streams-combine.js
// Outputs:
// 1 abc
// 2 def
// 3 ghi
// chunks1: ["abc\nd","ef\nghi\n"]
// chunks2: ["abc","def","ghi"]
// Best working solution I could find
var stream3 = function(src) {
return src.pipe(stream1).pipe(stream2);
};
stream3(stdin).pipe(stdout);
// $ (printf "abc\nd"; sleep 1; printf "ef\nghi\n") | node streams-combine.js
// Outputs:
// 1 abc
// 2 def
// 3 ghi
// chunks1: ["abc\nd","ef\nghi\n"]
// chunks2: ["abc","def","ghi"]
Is this at all possible? Let me know if what I'm trying to do isn't clear.
Thanks!
We can use Duplex streams to set a delay of when the data is brought into our application: const { PassThrough, Duplex } = require("stream"); const { createReadStream, createWriteStream } = require("fs"); const readStream = createReadStream("./movie. mp4"); const writeStream = createWriteStream("./copy.
pipe() method in a Readable Stream is used to attach a Writable stream to the readable stream so that it consequently switches into flowing mode and then pushes all the data that it has to the attached Writable.
Chaining is a mechanism to connect the output of one stream to another stream and create a chain of multiple stream operations. It is normally used with piping operations. Now we'll use piping and chaining to first compress a file and then decompress the same.
You can watch for something to be piped to your stream, and then unpipe
it and pipe it to the streams you're interested in:
var PassThrough = require('stream').PassThrough;
var stream3 = new PassThrough();
// When a source stream is piped to us, undo that pipe, and save
// off the source stream piped into our internally managed streams.
stream3.on('pipe', function(source) {
source.unpipe(this);
this.transformStream = source.pipe(stream1).pipe(stream2);
});
// When we're piped to another stream, instead pipe our internal
// transform stream to that destination.
stream3.pipe = function(destination, options) {
return this.transformStream.pipe(destination, options);
};
stdin.pipe(stream3).pipe(stdout);
You can extract this functionality into your own constructable stream class:
var util = require('util');
var PassThrough = require('stream').PassThrough;
var StreamCombiner = function() {
this.streams = Array.prototype.slice.apply(arguments);
this.on('pipe', function(source) {
source.unpipe(this);
for(i in this.streams) {
source = source.pipe(this.streams[i]);
}
this.transformStream = source;
});
};
util.inherits(StreamCombiner, PassThrough);
StreamCombiner.prototype.pipe = function(dest, options) {
return this.transformStream.pipe(dest, options);
};
var stream3 = new StreamCombiner(stream1, stream2);
stdin.pipe(stream3).pipe(stdout);
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With