Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Node.js Piping the same readable stream into multiple (writable) targets

I need to run two commands in series that need to read data from the same stream. After piping a stream into another the buffer is emptied so i can't read data from that stream again so this doesn't work:

var spawn = require('child_process').spawn; var fs = require('fs'); var request = require('request');  var inputStream = request('http://placehold.it/640x360'); var identify = spawn('identify',['-']);  inputStream.pipe(identify.stdin);  var chunks = []; identify.stdout.on('data',function(chunk) {   chunks.push(chunk); });  identify.stdout.on('end',function() {   var size = getSize(Buffer.concat(chunks)); //width   var convert = spawn('convert',['-','-scale',size * 0.5,'png:-']);   inputStream.pipe(convert.stdin);   convert.stdout.pipe(fs.createWriteStream('half.png')); });  function getSize(buffer){   return parseInt(buffer.toString().split(' ')[2].split('x')[0]); } 

Request complains about this

Error: You cannot pipe after data has been emitted from the response. 

and changing the inputStream to fs.createWriteStream yields the same issue of course. I don't want to write into a file but reuse in some way the stream that request produces (or any other for that matter).

Is there a way to reuse a readable stream once it finishes piping? What would be the best way to accomplish something like the above example?

like image 746
Maroshii Avatar asked Oct 23 '13 22:10

Maroshii


People also ask

What is the correct way to pipe a readable stream and a writable stream in node JS?

To consume a readable stream, we can use the pipe / unpipe methods, or the read / unshift / resume methods. To consume a writable stream, we can make it the destination of pipe / unpipe , or just write to it with the write method and call the end method when we're done.

Which of the following is true about piping streams in node JS?

Q 6 - Which of the following is true about Piping streams? A - Piping is a mechanism where we provide output of one stream as the input to another stream. B - Piping is normally used to get data from one stream and to pass output of that stream to another stream.

Which of the following stream is used for both reading and writing in node JS?

Duplex − Stream which can be used for both read and write operation.

Which of the following methods is used to switch between modes in readable stream?

One of the ways of switching the mode of a stream to flowing is to attach a 'data' event listener. A way to switch the readable stream to a flowing mode manually is to call the stream. resume method.


2 Answers

You have to create duplicate of the stream by piping it to two streams. You can create a simple stream with a PassThrough stream, it simply passes the input to the output.

const spawn = require('child_process').spawn; const PassThrough = require('stream').PassThrough;  const a = spawn('echo', ['hi user']); const b = new PassThrough(); const c = new PassThrough();  a.stdout.pipe(b); a.stdout.pipe(c);  let count = 0; b.on('data', function (chunk) {   count += chunk.length; }); b.on('end', function () {   console.log(count);   c.pipe(process.stdout); }); 

Output:

8 hi user 
like image 191
user568109 Avatar answered Sep 20 '22 09:09

user568109


The first answer only works if streams take roughly the same amount of time to process data. If one takes significantly longer, the faster one will request new data, consequently overwriting the data still being used by the slower one (I had this problem after trying to solve it using a duplicate stream).

The following pattern worked very well for me. It uses a library based on Stream2 streams, Streamz, and Promises to synchronize async streams via a callback. Using the familiar example from the first answer:

spawn = require('child_process').spawn; pass = require('stream').PassThrough; streamz = require('streamz').PassThrough; var Promise = require('bluebird');  a = spawn('echo', ['hi user']); b = new pass; c = new pass;     a.stdout.pipe(streamz(combineStreamOperations));   function combineStreamOperations(data, next){   Promise.join(b, c, function(b, c){ //perform n operations on the same data   next(); //request more }  count = 0; b.on('data', function(chunk) { count += chunk.length; }); b.on('end', function() { console.log(count); c.pipe(process.stdout); }); 
like image 32
artikas Avatar answered Sep 21 '22 09:09

artikas