Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Implementing a buffered transform stream

Tags:

I am trying to implement a stream with the new Node.js streams API that will buffer a certain amount of data. When this stream is piped to another stream, or if something consumes readable events, this stream should flush its buffer and then simply become pass-through. The catch is, this stream will be piped to many other streams, and when each destination stream is attached, the buffer must be flushed even if it is already flushed to another stream.

For example:

  1. BufferStream implements stream.Transform, and keeps a 512KB internal ring buffer
  2. ReadableStreamA is piped to an instance of BufferStream
  3. BufferStream writes to its ring buffer, reading data from ReadableStreamA as it comes in. (It doesn't matter if data is lost, as the buffer overwrites old data.)
  4. BufferStream is piped to WritableStreamB
  5. WritableStreamB receives the entire 512KB buffer, and continues to get data as it is written from ReadableStreamA through BufferStream.
  6. BufferStream is piped to WritableStreamC
  7. WritableStreamC also receives the entire 512KB buffer, but this buffer is now different than what WritableStreamB received, because more data has since been written to BufferStream.

Is this possible with the streams API? The only method I can think of would be to create an object with a method that spins up a new PassThrough stream for each destination, meaning I couldn't simply pipe to and from it.

For what it's worth, I've done this with the old "flowing" API by simply listening for new handlers on data events. When a new function was attached with .on('data'), I would call it directly with a copy of the ring buffer.

like image 371
Brad Avatar asked Dec 01 '13 21:12

Brad


People also ask

What are buffers and streams in NodeJS?

Before discussing the concept of buffers and streams in Nodejs, let us define Buffers in general computer science. Buffer in computer science is a space in the computer’s physical memory used to store temporary data. The purpose of the buffer is to store data right before it is used; With this definition in mind let us now define NodeJs Buffer.

How do I process a stream of data?

If you need to process a stream of data from a source, you need to create one such stream, or even repeatedly transform or filter data, Node has that abstraction for you: Streams. (Some of the code samples in this article you can find in this github repo). In this article we’re going to continue building onto our home automation system.

What are the two types of read-streams?

There are two main types of read stream: one that you must pull data from, and one that pushes data to you. Am illustration of a push stream is a water tap: once you open it, it keeps gushing water. An illustration of a pull stream can be a drinking straw: it pulls the water only when the straw user sucks on it.

What is a readable stream?

A readable stream can emit any type of data: binary data in the form of buffers or strings, or even more complex data in the form of any JavaScript object. In addition to emitting data, a readable stream can be paused and resumed: When paused, a readable stream will emit no more data until it has been resumed.


2 Answers

Here's my take on your issue.

The basic idea is to create a Transform stream, which will allow us to execute your custom buffering logic before sending the data on the output of the stream:

var util = require('util')
var stream = require('stream')

var BufferStream = function (streamOptions) {
  stream.Transform.call(this, streamOptions)
  this.buffer = new Buffer('')
}

util.inherits(BufferStream, stream.Transform)

BufferStream.prototype._transform = function (chunk, encoding, done) {
  // custom buffering logic
  // ie. add chunk to this.buffer, check buffer size, etc.
  this.buffer = new Buffer(chunk)

  this.push(chunk)
  done()
}

Then, we need to override the .pipe() method so that we are are notified when the BufferStream is piped into a stream, which allows us to automatically write data to it:

BufferStream.prototype.pipe = function (destination, options) {
  var res = BufferStream.super_.prototype.pipe.call(this, destination, options)
  res.write(this.buffer)
  return res
}

In this way, when we write buffer.pipe(someStream), we perform the pipe as intended and write the internal buffer to the output stream. After that, the Transform class takes care of everything, while keeping track of the backpressure and whatnot.

Here is a working gist. Please note that I didn't bother writing a correct buffering logic (ie. I don't care about the size of the internal buffer), but this should be easy to fix.

like image 56
Paul Mougel Avatar answered Nov 14 '22 06:11

Paul Mougel


Paul's answer is good, but I don't think it meets the exact requirements. It sounds like what needs to happen is that everytime pipe() is called on this transform stream, it needs to first flush the buffer that represents all the accumulation of data between time the transform stream was created/(connected to the source stream) and the time it was connected to the current writable/destination stream.

Something like this might be more correct:

  var BufferStream = function () {
        stream.Transform.apply(this, arguments);
        this.buffer = []; //I guess an array will do
    };

    util.inherits(BufferStream, stream.Transform);

    BufferStream.prototype._transform = function (chunk, encoding, done) {

        this.push(chunk ? String(chunk) : null);
        this.buffer.push(chunk ? String(chunk) : null);

        done()
    };

    BufferStream.prototype.pipe = function (destination, options) {
        var res = BufferStream.super_.prototype.pipe.apply(this, arguments);
        this.buffer.forEach(function (b) {
            res.write(String(b));
        });
        return res;
    };


    return new BufferStream();

I suppose this:

BufferStream.super_.prototype.pipe.apply(this, arguments);

is equivalent to this:

stream.Transform.prototype.pipe.apply(this, arguments);

You could probably optimize this and use some flags when pipe/unpipe are called.

like image 20
Alexander Mills Avatar answered Nov 14 '22 06:11

Alexander Mills