I need to run two commands in series that need to read data from the same stream. After piping a stream into another the buffer is emptied so i can't read data from that stream again so this doesn't work:
var spawn = require('child_process').spawn; var fs = require('fs'); var request = require('request'); var inputStream = request('http://placehold.it/640x360'); var identify = spawn('identify',['-']); inputStream.pipe(identify.stdin); var chunks = []; identify.stdout.on('data',function(chunk) { chunks.push(chunk); }); identify.stdout.on('end',function() { var size = getSize(Buffer.concat(chunks)); //width var convert = spawn('convert',['-','-scale',size * 0.5,'png:-']); inputStream.pipe(convert.stdin); convert.stdout.pipe(fs.createWriteStream('half.png')); }); function getSize(buffer){ return parseInt(buffer.toString().split(' ')[2].split('x')[0]); }
Request complains about this
Error: You cannot pipe after data has been emitted from the response.
and changing the inputStream to fs.createWriteStream
yields the same issue of course. I don't want to write into a file but reuse in some way the stream that request produces (or any other for that matter).
Is there a way to reuse a readable stream once it finishes piping? What would be the best way to accomplish something like the above example?
To consume a readable stream, we can use the pipe / unpipe methods, or the read / unshift / resume methods. To consume a writable stream, we can make it the destination of pipe / unpipe , or just write to it with the write method and call the end method when we're done.
Q 6 - Which of the following is true about Piping streams? A - Piping is a mechanism where we provide output of one stream as the input to another stream. B - Piping is normally used to get data from one stream and to pass output of that stream to another stream.
Duplex − Stream which can be used for both read and write operation.
One of the ways of switching the mode of a stream to flowing is to attach a 'data' event listener. A way to switch the readable stream to a flowing mode manually is to call the stream. resume method.
You have to create duplicate of the stream by piping it to two streams. You can create a simple stream with a PassThrough stream, it simply passes the input to the output.
const spawn = require('child_process').spawn; const PassThrough = require('stream').PassThrough; const a = spawn('echo', ['hi user']); const b = new PassThrough(); const c = new PassThrough(); a.stdout.pipe(b); a.stdout.pipe(c); let count = 0; b.on('data', function (chunk) { count += chunk.length; }); b.on('end', function () { console.log(count); c.pipe(process.stdout); });
Output:
8 hi user
The first answer only works if streams take roughly the same amount of time to process data. If one takes significantly longer, the faster one will request new data, consequently overwriting the data still being used by the slower one (I had this problem after trying to solve it using a duplicate stream).
The following pattern worked very well for me. It uses a library based on Stream2 streams, Streamz, and Promises to synchronize async streams via a callback. Using the familiar example from the first answer:
spawn = require('child_process').spawn; pass = require('stream').PassThrough; streamz = require('streamz').PassThrough; var Promise = require('bluebird'); a = spawn('echo', ['hi user']); b = new pass; c = new pass; a.stdout.pipe(streamz(combineStreamOperations)); function combineStreamOperations(data, next){ Promise.join(b, c, function(b, c){ //perform n operations on the same data next(); //request more } count = 0; b.on('data', function(chunk) { count += chunk.length; }); b.on('end', function() { console.log(count); c.pipe(process.stdout); });
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With