I need to "pause" a readable stream for a certain number of seconds and resume it again. The readable stream is being piped to a transform stream, so I cannot use the regular pause
and resume
methods, I had to use unpipe
and pipe
. In the transform stream, I am able to detect the pipe
event and then do unpipe
on the readable stream, then after a number of seconds, do pipe
again to resume it (I hope).
Here is the code:
import {Transform, Readable} from 'stream';
const alphaTransform = new class extends Transform {
constructor() {
super({
objectMode: true,
transform: (chunk: string | Buffer, encoding: string, callback: Function) => {
let transformed: IterableIterator<string>;
if (Buffer.isBuffer(chunk)) {
transformed = function* () {
for (const val of chunk) {
yield String.fromCharCode(val);
}
}();
} else {
transformed = chunk[Symbol.iterator]();
}
callback(null,
Array.from(transformed).map(s => s.toUpperCase()).join(''));
}
});
}
}
const spyingAlphaTransformStream = new class extends Transform {
private oncePaused = false;
constructor() {
super({
transform: (chunk: string | Buffer, encoding: string, callback: Function) => {
console.log('Before transform:');
if (Buffer.isBuffer(chunk)) {
console.log(chunk.toString('utf-8'));
alphaTransform.write(chunk);
} else {
console.log(chunk);
alphaTransform.write(chunk, encoding);
}
callback(null, alphaTransform.read());
}
});
this.on('pipe', (src: Readable) => {
if (!this.oncePaused) {
src.unpipe(this); // Here I unpipe the readable stream
console.log(`Data event listeners count: ${src.listeners('data').length}`);
console.log(`Readable state of reader: ${src.readable}`);
console.log("We paused the reader!!");
setTimeout(() => {
this.oncePaused = true;
src.pipe(this); // Here I resume it...hopefully?
src.resume();
console.log("We unpaused the reader!!");
console.log(`Data event listeners count: ${src.listeners('data').length}`);
console.log(`Readable state of reader: ${src.readable}`);
}, 1000);
}
});
this.on('data', (transformed) => {
console.log('After transform:\n', transformed);
});
}
}
const reader = new class extends Readable {
constructor(private content?: string | Buffer) {
super({
read: (size?: number) => {
if (!this.content) {
this.push(null);
} else {
this.push(this.content.slice(0, size));
this.content = this.content.slice(size);
}
}
});
}
} (new Buffer('The quick brown fox jumps over the lazy dog.\n'));
reader.pipe(spyingAlphaTransformStream)
.pipe(process.stdout);
The problem is with the middle stream spyingAlphaTransformStream
. This is the one that listens for the pipe event and then pauses and resumes the readable stream after 1 second
. The problem is that after it unpipes the readable stream, and then pipes from it again, nothing is written to the standard output, which means that the transform
method of the spyingAlphaTransformStream
is never called which means something is broken in the stream.
I expect the output to look something like:
Data event listeners count: 0
Readable state of reader: true
We paused the reader!!
We unpaused the reader!!
Data event listeners count: 1
Readable state of reader: true
Before transform:
The quick brown fox jumps over the lazy dog.
After transform:
THE QUICK BROWN FOX JUMPS OVER THE LAZY DOG.
THE QUICK BROWN FOX JUMPS OVER THE LAZY DOG.
But it actually looks like:
Data event listeners count: 0
Readable state of reader: true
We paused the reader!!
We unpaused the reader!!
Data event listeners count: 1
Readable state of reader: true
Basically nothing is being piped from the readable is what I can conclude from that.
How can I fix this?
{
"name": "hello-stream",
"version": "1.0.0",
"main": "main.ts",
"scripts": {
"start": "npm run build:live",
"build:live": "nodemon"
},
"keywords": [
"typescript",
"nodejs",
"ts-node",
"cli",
"node",
"hello"
],
"license": "WTFPL",
"devDependencies": {
"@types/node": "^7.0.21",
"nodemon": "^1.11.0",
"ts-node": "^3.0.4",
"typescript": "^2.3.2"
},
"dependencies": {}
}
{
"ignore": ["node_modules"],
"delay": "2000ms",
"execMap": {
"ts": "ts-node"
},
"runOnChangeOnly": false,
"verbose": true
}
{
"compilerOptions": {
"target": "es2015",
"module": "commonjs",
"typeRoots": ["node_modules/@types"],
"lib": ["es6", "dom"],
"strict": true,
"noUnusedLocals": true,
"types": ["node"]
}
}
The solution was surprisingly simpler than I had expected. What I had to do was to find a way to defer any callbacks done in the transform
method, and wait till the stream was "ready" before calling the initial callback.
Basically, in the spyingAlphaTransformStream
constructor, I had a boolean value checking whether the stream was ready, and if it wasn't, I stored a callback in the class which will execute the first callback I received in the transform
method. Now since that first callback was not executed, the stream does not receive further calls i.e. there is only ever one pending callback to worry about; so it was now simply a waiting game until the stream indicates it was ready (this is done with a simple setTimeout
).
When the stream is "ready", I set the ready boolean to true, then I call the pending callback (if set) and at this point, flow continues throughout the entire stream.
I have a longer example to show how this works:
import {Transform, Readable} from 'stream';
const alphaTransform = new class extends Transform {
constructor() {
super({
objectMode: true,
transform: (chunk: string | Buffer, encoding: string, callback: Function) => {
let transformed: IterableIterator<string>;
if (Buffer.isBuffer(chunk)) {
transformed = function* () {
for (const val of chunk) {
yield String.fromCharCode(val);
}
}();
} else {
transformed = chunk[Symbol.iterator]();
}
callback(null,
Array.from(transformed).map(s => s.toUpperCase()).join(''));
}
});
}
}
class LoggingStream extends Transform {
private pending: () => void;
private isReady = false;
constructor(message: string) {
super({
objectMode: true,
transform: (chunk: string | Buffer, encoding: string, callback: Function) => {
if (!this.isReady) { // ready flag
this.pending = () => { // create a pending callback
console.log(message);
if (Buffer.isBuffer(chunk)) {
console.log(`[${new Date().toTimeString()}]: ${chunk.toString('utf-8')}`);
} else {
console.log(`[${new Date().toTimeString()}]: ${chunk}`);
}
callback(null, chunk);
}
} else {
console.log(message);
if (Buffer.isBuffer(chunk)) {
console.log(`[${new Date().toTimeString()}]: ${chunk.toString('utf-8')}`);
} else {
console.log(`[${new Date().toTimeString()}]: ${chunk}`);
}
callback(null, chunk);
}
}
});
this.on('pipe', this.pauseOnPipe);
}
private pauseOnPipe() {
this.removeListener('pipe', this.pauseOnPipe);
setTimeout(() => {
this.isReady = true; // set ready flag to true
if (this.pending) { // execute pending callbacks (if any)
this.pending();
}
}, 3000); // wait three seconds
}
}
const reader = new class extends Readable {
constructor(private content?: string | Buffer) {
super({
read: (size?: number) => {
if (!this.content) {
this.push(null);
} else {
this.push(this.content.slice(0, size));
this.content = this.content.slice(size);
}
}
});
}
} (new Buffer('The quick brown fox jumps over the lazy dog.\n'));
reader.pipe(new LoggingStream("Before transformation:"))
.pipe(alphaTransform)
.pipe(new LoggingStream("After transformation:"))
.pipe(process.stdout);
<Waits about 3 seconds...>
Before transformation:
[11:13:53 GMT-0600 (CST)]: The quick brown fox jumps over the lazy dog.
After transformation:
[11:13:53 GMT-0600 (CST)]: THE QUICK BROWN FOX JUMPS OVER THE LAZY DOG.
THE QUICK BROWN FOX JUMPS OVER THE LAZY DOG.
Note since JS is single threaded, both verbose streams wait the same amount of time before continuing
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With