TL;DR How can I read some data from a stream and then put it back allowing other consumers to get the same data
event?
Here's a readable stream that streams 1...Infinity:
var Readable = require('stream').Readable;
var readable = new Readable();
var c = 0;
readable._read = function () {
var self = this;
setTimeout(function () {
self.push((++c).toString());
}, 500);
};
I want to read the first data
event, look at the data and then "reset" the stream to its original state and allow other another data
listener to consume the first event as if it never happened. I thought unshift()
would be the correct method as it says in the docs:
readable.unshift(chunk)#
chunk Buffer | String Chunk of data to unshift onto the read queue This is useful in certain cases where a stream is being consumed by a parser, which needs to "un-consume" some data that it has optimistically pulled out of the source, so that the stream can be passed on to some other party.
That sounds perfect for my needs but it doesn't work how I expect:
...
readable.once('data', function (d) {
console.log(d.toString()); // Outputs 1
readable.unshift(d); // Put the 1 back on the stream
readable.on('data', function (d) {
console.log(d.toString()); // Heh?! Outputs 2, how about 1?
});
});
This involves two methods — ReadableStream. pipeThrough() , which pipes a readable stream through a writer/reader pair to transform one data format into another, and ReadableStream. pipeTo() , which pipes a readable stream to a writer acting as an end point for the pipe chain.
In the flowing mode, data can actually be lost if no consumers are available to handle it. This is why, when we have a readable stream in flowing mode, we need a data event handler.
Readable: streams from which data can be read. For example: fs. createReadStream() lets us read the contents of a file. Duplex: streams that are both Readable and Writable. For example, net.
Q 4 - Which of the following is true about readable stream? A - Readable stream is used for read operation.
So I figured out the answer:
When you call stream.unshift()
if will emit the data event immediately if the stream is in flowing mode. So by the time I add the listener in my example, the ship has already sailed.
readable.unshift(d); // emits 'data' event
readable.on('data', function (d) { // missed `data` event
console.log(d.toString());
});
There's a couple of ways to make it work how I expected:
1) Add the new listener before unshifting:
readable.once('data', function (d) {
console.log(d.toString()); // Outputs 1
readable.on('data', function (d) {
console.log(d.toString()); // Outputs 1,1,2,3...
});
readable.unshift(d); // Put the 1 back on the stream
});
2) Pause and resume the stream:
readable.once('data', function (d) {
console.log(d.toString()); // Outputs 1
readable.pause(); // Stops the stream from flowing
readable.unshift(d); // Put the 1 back on the stream
readable.on('data', function (d) {
console.log(d.toString()); // Outputs 1,1,2,3...
});
readable.resume(); // Start the stream flowing again
});
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With