I am trying to implement a stream with the new Node.js streams API that will buffer a certain amount of data. When this stream is piped to another stream, or if something consumes readable
events, this stream should flush its buffer and then simply become pass-through. The catch is, this stream will be piped to many other streams, and when each destination stream is attached, the buffer must be flushed even if it is already flushed to another stream.
For example:
BufferStream
implements stream.Transform
, and keeps a 512KB internal ring bufferReadableStreamA
is piped to an instance of BufferStream
BufferStream
writes to its ring buffer, reading data from ReadableStreamA
as it comes in. (It doesn't matter if data is lost, as the buffer overwrites old data.)BufferStream
is piped to WritableStreamB
WritableStreamB
receives the entire 512KB buffer, and continues to get data as it is written from ReadableStreamA
through BufferStream
.BufferStream
is piped to WritableStreamC
WritableStreamC
also receives the entire 512KB buffer, but this buffer is now different than what WritableStreamB
received, because more data has since been written to BufferStream
.Is this possible with the streams API? The only method I can think of would be to create an object with a method that spins up a new PassThrough stream for each destination, meaning I couldn't simply pipe to and from it.
For what it's worth, I've done this with the old "flowing" API by simply listening for new handlers on data
events. When a new function was attached with .on('data')
, I would call it directly with a copy of the ring buffer.
Before discussing the concept of buffers and streams in Nodejs, let us define Buffers in general computer science. Buffer in computer science is a space in the computer’s physical memory used to store temporary data. The purpose of the buffer is to store data right before it is used; With this definition in mind let us now define NodeJs Buffer.
If you need to process a stream of data from a source, you need to create one such stream, or even repeatedly transform or filter data, Node has that abstraction for you: Streams. (Some of the code samples in this article you can find in this github repo). In this article we’re going to continue building onto our home automation system.
There are two main types of read stream: one that you must pull data from, and one that pushes data to you. Am illustration of a push stream is a water tap: once you open it, it keeps gushing water. An illustration of a pull stream can be a drinking straw: it pulls the water only when the straw user sucks on it.
A readable stream can emit any type of data: binary data in the form of buffers or strings, or even more complex data in the form of any JavaScript object. In addition to emitting data, a readable stream can be paused and resumed: When paused, a readable stream will emit no more data until it has been resumed.
Here's my take on your issue.
The basic idea is to create a Transform
stream, which will allow us to execute your custom buffering logic before sending the data on the output of the stream:
var util = require('util')
var stream = require('stream')
var BufferStream = function (streamOptions) {
stream.Transform.call(this, streamOptions)
this.buffer = new Buffer('')
}
util.inherits(BufferStream, stream.Transform)
BufferStream.prototype._transform = function (chunk, encoding, done) {
// custom buffering logic
// ie. add chunk to this.buffer, check buffer size, etc.
this.buffer = new Buffer(chunk)
this.push(chunk)
done()
}
Then, we need to override the .pipe()
method so that we are are notified when the BufferStream
is piped into a stream, which allows us to automatically write data to it:
BufferStream.prototype.pipe = function (destination, options) {
var res = BufferStream.super_.prototype.pipe.call(this, destination, options)
res.write(this.buffer)
return res
}
In this way, when we write buffer.pipe(someStream)
, we perform the pipe as intended and write the internal buffer to the output stream. After that, the Transform
class takes care of everything, while keeping track of the backpressure and whatnot.
Here is a working gist. Please note that I didn't bother writing a correct buffering logic (ie. I don't care about the size of the internal buffer), but this should be easy to fix.
Paul's answer is good, but I don't think it meets the exact requirements. It sounds like what needs to happen is that everytime pipe() is called on this transform stream, it needs to first flush the buffer that represents all the accumulation of data between time the transform stream was created/(connected to the source stream) and the time it was connected to the current writable/destination stream.
Something like this might be more correct:
var BufferStream = function () {
stream.Transform.apply(this, arguments);
this.buffer = []; //I guess an array will do
};
util.inherits(BufferStream, stream.Transform);
BufferStream.prototype._transform = function (chunk, encoding, done) {
this.push(chunk ? String(chunk) : null);
this.buffer.push(chunk ? String(chunk) : null);
done()
};
BufferStream.prototype.pipe = function (destination, options) {
var res = BufferStream.super_.prototype.pipe.apply(this, arguments);
this.buffer.forEach(function (b) {
res.write(String(b));
});
return res;
};
return new BufferStream();
I suppose this:
BufferStream.super_.prototype.pipe.apply(this, arguments);
is equivalent to this:
stream.Transform.prototype.pipe.apply(this, arguments);
You could probably optimize this and use some flags when pipe/unpipe are called.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With