Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Putting data back onto a Readable stream

Tags:

stream

node.js

TL;DR How can I read some data from a stream and then put it back allowing other consumers to get the same data event?

Here's a readable stream that streams 1...Infinity:

var Readable = require('stream').Readable;

var readable = new Readable();

var c = 0;

readable._read = function () {

    var self = this;

    setTimeout(function () {
        self.push((++c).toString());
    }, 500);
};

I want to read the first data event, look at the data and then "reset" the stream to its original state and allow other another data listener to consume the first event as if it never happened. I thought unshift() would be the correct method as it says in the docs:

readable.unshift(chunk)#

chunk Buffer | String Chunk of data to unshift onto the read queue This is useful in certain cases where a stream is being consumed by a parser, which needs to "un-consume" some data that it has optimistically pulled out of the source, so that the stream can be passed on to some other party.

That sounds perfect for my needs but it doesn't work how I expect:

...

readable.once('data', function (d) {

    console.log(d.toString());              // Outputs 1

    readable.unshift(d);                    // Put the 1 back on the stream

    readable.on('data', function (d) {
        console.log(d.toString());          // Heh?! Outputs 2, how about 1?
    });

});
like image 887
Matt Harrison Avatar asked Mar 16 '15 08:03

Matt Harrison


People also ask

How do you read data from a readable stream?

This involves two methods — ReadableStream. pipeThrough() , which pipes a readable stream through a writer/reader pair to transform one data format into another, and ReadableStream. pipeTo() , which pipes a readable stream to a writer acting as an end point for the pipe chain.

What will happen if we put a readable stream into the flowing mode while there are no consumers connected to it?

In the flowing mode, data can actually be lost if no consumers are available to handle it. This is why, when we have a readable stream in flowing mode, we need a data event handler.

What are readable streams?

Readable: streams from which data can be read. For example: fs. createReadStream() lets us read the contents of a file. Duplex: streams that are both Readable and Writable. For example, net.

Which of the following is true about a readable stream?

Q 4 - Which of the following is true about readable stream? A - Readable stream is used for read operation.


1 Answers

So I figured out the answer:

When you call stream.unshift() if will emit the data event immediately if the stream is in flowing mode. So by the time I add the listener in my example, the ship has already sailed.

readable.unshift(d);                  // emits 'data' event

readable.on('data', function (d) {    // missed `data` event
    console.log(d.toString());
});

There's a couple of ways to make it work how I expected:

1) Add the new listener before unshifting:

readable.once('data', function (d) {

    console.log(d.toString());              // Outputs 1

    readable.on('data', function (d) {
        console.log(d.toString());          // Outputs 1,1,2,3...
    });

    readable.unshift(d);                    // Put the 1 back on the stream

});

2) Pause and resume the stream:

readable.once('data', function (d) {

    console.log(d.toString());              // Outputs 1
    readable.pause();                       // Stops the stream from flowing
    readable.unshift(d);                    // Put the 1 back on the stream

    readable.on('data', function (d) {
        console.log(d.toString());          // Outputs 1,1,2,3...
    });

    readable.resume();                      // Start the stream flowing again

});
like image 164
Matt Harrison Avatar answered Oct 01 '22 10:10

Matt Harrison