Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Piping data to writable stream that is not ready to receive data yet

Is there a way to connect a readable stream to a writable stream in Node.js, where the writable is not ready to receive data yet? In other words, I'd like to connect the readable with the writable, but I want to initialize the writable, including defining the write method, at a later point in the program. Perhaps we must implement the write method, but is there a way to pause a writable stream in similar way in which you can pause a readable stream? Or perhaps we can use an intermediate through/transform stream and buffer the data there, before we pipe the data to the writable!

As an example, normally we do:

readable.pipe(transform).pipe(writable);

but I want to do something like:

const tstrm = readable.pipe(transform);

doSomethingAsync().then(function(){

      tstrm.pipe(writable);

});

just wondering if this is possible and how to do it right, so far having trouble figuring out both.

I guess I am looking to buffer the data in an intermediary transform stream, before it's connected/piped to a writable stream, and subsequently, once it's connected, stream the buffered data first before any new data. Seems like a reasonable thing to do, can't find any info on this.

like image 379
Alexander Mills Avatar asked Apr 12 '16 06:04

Alexander Mills


1 Answers

Note, I'm using an interval here to mimic the writer being able to read or not. You can do this any way you want ie if the writer returns false you would update the state to start buffering etc. I think the last line is what you want ie

r.pipe(b).pipe(w);

This reads as follows

readStrem.pipe(transformBbuffer).pipe(writeStream);

The example code, there are some changes we can make to buffer all data. I'll describe after the code. Everything you need to know about streams and more are in the docs, I think they could do with more complete examples but they're pretty good as is...

https://nodejs.org/api/stream.html#stream_class_stream_transform_1

This the code.

var fs     = require('fs');
var stream = require('stream')
const util = require('util');
//const StringDecoder = require('string_decoder').StringDecoder;
const Transform = require('stream').Transform;
var check_buff  = 0;
var DRAIN_ME    = 0;

var r = fs.createReadStream('file1.txt').setEncoding('utf8');
var w = fs.createWriteStream('file2.txt');

var BufferStream = function () {
  stream.Transform.apply(this, arguments);
  this.buffer = []; 
};

util.inherits(BufferStream, stream.Transform);

var intId;
intId = setInterval(function(){
  if(check_buff % 3 == 0) {
    DRAIN_ME = 1;
    return;
  }
  DRAIN_ME = 0;
},10);  

BufferStream.prototype._transform = function (chunk, encoding, done) {
  this.buffer.push(String(chunk));
  while(DRAIN_ME > 0 && this.buffer.length > 0) {
    this.push(this.buffer.shift());
  }
  console.log(chunk.length);
  console.log(this.buffer.length);
  done();
};

var b = new BufferStream();
b.on('end', function(chunk) {
  clearInterval(intId);
});
r.pipe(b).pipe(w);

I am looking for the canonical way to implement a transform/through stream, that buffers all data until pipe is call on it.

Make the following changes

BufferStream.prototype._transform = function (chunk, encoding, done) {
  this.buffer.push(String(chunk));

  console.log(chunk.length);
  console.log(this.buffer.length);
  done();
};
......
BufferStream.prototype._flush = function (cb) {
  var len = this.buffer.length;
  for (var i = 0; i < len; i++) {
    this.push(this.buffer.shift());
  };
  cb();
};

You can also pause the readable stream which will in effect pause the writable stream because it stops receiving data ie...

To test this create a fairly large file on disk ie 100MB or more and run this...

var fs = require('fs');
var readableStream = fs.createReadStream('file1.txt');
var writableStream = fs.createWriteStream('file2.txt');

readableStream.setEncoding('utf8');

readableStream.on('data', function(chunk) {
  var ready = 0;
  readableStream.pause();
  setInterval(function(){
    if(ready == 0) {
      //console.log('pausing');
      readableStream.pause();
      ready = 1;
    }   
    else {
      //console.log('resuming');
      readableStream.resume();
      ready = 0;
    }   
  },100);  
  writableStream.write(chunk);
});

The reason for the immediate pause is because by the time the interval has fired 10ms the file may already have been written. There are variations on this ie...

var fs = require('fs');
var readableStream = fs.createReadStream('file1.txt');
var writableStream = fs.createWriteStream('file2.txt');
readableStream.setEncoding('utf8');

var ready = 0;
setInterval(function(){
  if(ready == 0) {
    //console.log('pausing');
    readableStream.pause();
    ready = 1;
  }
  else {
    //console.log('resuming');
    readableStream.resume();
    ready = 0;
  }
},100);  

readableStream.on('data', function(chunk) {
  writableStream.write(chunk);
  readableStream.pause();
});
like image 53
Harry Avatar answered Nov 14 '22 23:11

Harry