Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Nodejs stream pausing (unpipe) and resume (pipe) intermediate pipe

I need to "pause" a readable stream for a certain number of seconds and resume it again. The readable stream is being piped to a transform stream, so I cannot use the regular pause and resume methods, I had to use unpipe and pipe. In the transform stream, I am able to detect the pipe event and then do unpipe on the readable stream, then after a number of seconds, do pipe again to resume it (I hope).

Here is the code:

main.ts

import {Transform, Readable} from 'stream';

const alphaTransform = new class extends Transform {
    constructor() {
        super({
            objectMode: true,
            transform: (chunk: string | Buffer, encoding: string, callback: Function) => {
                let transformed: IterableIterator<string>;
                if (Buffer.isBuffer(chunk)) {
                    transformed = function* () {
                        for (const val of chunk) {
                            yield String.fromCharCode(val);
                        }
                    }();
                } else {
                    transformed = chunk[Symbol.iterator]();
                }
                callback(null,
                    Array.from(transformed).map(s => s.toUpperCase()).join(''));
            }
        });
    }
}

const spyingAlphaTransformStream =  new class extends Transform {
    private oncePaused = false;

    constructor() {
        super({
            transform: (chunk: string | Buffer, encoding: string, callback: Function) => {
                console.log('Before transform:');
                if (Buffer.isBuffer(chunk)) {
                    console.log(chunk.toString('utf-8'));
                    alphaTransform.write(chunk);
                } else {
                    console.log(chunk);
                    alphaTransform.write(chunk, encoding);
                }
                callback(null, alphaTransform.read());
            }
        });

        this.on('pipe', (src: Readable) => {
            if (!this.oncePaused) {
                src.unpipe(this); // Here I unpipe the readable stream
                console.log(`Data event listeners count: ${src.listeners('data').length}`);
                console.log(`Readable state of reader: ${src.readable}`);
                console.log("We paused the reader!!");
                setTimeout(() => {
                    this.oncePaused = true;
                    src.pipe(this); // Here I resume it...hopefully?
                    src.resume();
                    console.log("We unpaused the reader!!");
                    console.log(`Data event listeners count: ${src.listeners('data').length}`);
                    console.log(`Readable state of reader: ${src.readable}`);
                }, 1000);
            }
        });

        this.on('data', (transformed) => {
            console.log('After transform:\n', transformed);
        });
    }
}

const reader = new class extends Readable {
    constructor(private content?: string | Buffer) {
        super({
            read: (size?: number) => {
                if (!this.content) {
                    this.push(null);
                } else {
                    this.push(this.content.slice(0, size));
                    this.content = this.content.slice(size);
                }
            }
        });
    }
} (new Buffer('The quick brown fox jumps over the lazy dog.\n'));

reader.pipe(spyingAlphaTransformStream)
    .pipe(process.stdout);

The problem is with the middle stream spyingAlphaTransformStream. This is the one that listens for the pipe event and then pauses and resumes the readable stream after 1 second. The problem is that after it unpipes the readable stream, and then pipes from it again, nothing is written to the standard output, which means that the transform method of the spyingAlphaTransformStream is never called which means something is broken in the stream.

I expect the output to look something like:

Data event listeners count: 0
Readable state of reader: true
We paused the reader!!
We unpaused the reader!!
Data event listeners count: 1
Readable state of reader: true
Before transform:
The quick brown fox jumps over the lazy dog.

After transform:
 THE QUICK BROWN FOX JUMPS OVER THE LAZY DOG.

THE QUICK BROWN FOX JUMPS OVER THE LAZY DOG.

But it actually looks like:

Data event listeners count: 0
Readable state of reader: true
We paused the reader!!
We unpaused the reader!!
Data event listeners count: 1
Readable state of reader: true

Basically nothing is being piped from the readable is what I can conclude from that.

How can I fix this?

package.json

{
  "name": "hello-stream",
  "version": "1.0.0",
  "main": "main.ts",
  "scripts": {
    "start": "npm run build:live",
    "build:live": "nodemon"
  },
  "keywords": [
    "typescript",
    "nodejs",
    "ts-node",
    "cli",
    "node",
    "hello"
  ],
  "license": "WTFPL",
  "devDependencies": {
    "@types/node": "^7.0.21",
    "nodemon": "^1.11.0",
    "ts-node": "^3.0.4",
    "typescript": "^2.3.2"
  },
  "dependencies": {}
}

nodemon.json

{
    "ignore": ["node_modules"],
    "delay": "2000ms",
    "execMap": {
        "ts": "ts-node"
    },
    "runOnChangeOnly": false,
    "verbose": true
}

tsconfig.json

{
  "compilerOptions": {
    "target": "es2015",
    "module": "commonjs",
    "typeRoots": ["node_modules/@types"],
    "lib": ["es6", "dom"],

    "strict": true,
    "noUnusedLocals": true,
    "types": ["node"]
  }
}
like image 775
smac89 Avatar asked Jun 26 '17 17:06

smac89


1 Answers

The solution was surprisingly simpler than I had expected. What I had to do was to find a way to defer any callbacks done in the transform method, and wait till the stream was "ready" before calling the initial callback.

Basically, in the spyingAlphaTransformStream constructor, I had a boolean value checking whether the stream was ready, and if it wasn't, I stored a callback in the class which will execute the first callback I received in the transform method. Now since that first callback was not executed, the stream does not receive further calls i.e. there is only ever one pending callback to worry about; so it was now simply a waiting game until the stream indicates it was ready (this is done with a simple setTimeout).

When the stream is "ready", I set the ready boolean to true, then I call the pending callback (if set) and at this point, flow continues throughout the entire stream.

I have a longer example to show how this works:

import {Transform, Readable} from 'stream';

const alphaTransform = new class extends Transform {
    constructor() {
        super({
            objectMode: true,
            transform: (chunk: string | Buffer, encoding: string, callback: Function) => {
                let transformed: IterableIterator<string>;
                if (Buffer.isBuffer(chunk)) {
                    transformed = function* () {
                        for (const val of chunk) {
                            yield String.fromCharCode(val);
                        }
                    }();
                } else {
                    transformed = chunk[Symbol.iterator]();
                }
                callback(null,
                    Array.from(transformed).map(s => s.toUpperCase()).join(''));
            }
        });
    }
}

class LoggingStream extends Transform {
    private pending: () => void;
    private isReady = false;

    constructor(message: string) {
        super({
            objectMode: true,
            transform: (chunk: string | Buffer, encoding: string, callback: Function) => {
                if (!this.isReady) { // ready flag
                    this.pending = () => { // create a pending callback
                        console.log(message);
                        if (Buffer.isBuffer(chunk)) {
                            console.log(`[${new Date().toTimeString()}]: ${chunk.toString('utf-8')}`);
                        } else {
                            console.log(`[${new Date().toTimeString()}]: ${chunk}`);
                        }
                        callback(null, chunk);
                    }
                } else {
                    console.log(message);
                    if (Buffer.isBuffer(chunk)) {
                        console.log(`[${new Date().toTimeString()}]: ${chunk.toString('utf-8')}`);
                    } else {
                        console.log(`[${new Date().toTimeString()}]: ${chunk}`);
                    }
                    callback(null, chunk);
                }
            }
        });

        this.on('pipe', this.pauseOnPipe);
    }

    private pauseOnPipe() {
        this.removeListener('pipe', this.pauseOnPipe);
        setTimeout(() => {
            this.isReady = true; // set ready flag to true
            if (this.pending) { // execute pending callbacks (if any) 
                this.pending();
            }
        }, 3000); // wait three seconds
    }
}

const reader = new class extends Readable {
    constructor(private content?: string | Buffer) {
        super({
            read: (size?: number) => {
                if (!this.content) {
                    this.push(null);
                } else {
                    this.push(this.content.slice(0, size));
                    this.content = this.content.slice(size);
                }
            }
        });
    }
} (new Buffer('The quick brown fox jumps over the lazy dog.\n'));

reader.pipe(new LoggingStream("Before transformation:"))
    .pipe(alphaTransform)
    .pipe(new LoggingStream("After transformation:"))
    .pipe(process.stdout);

Output

<Waits about 3 seconds...>

Before transformation:
[11:13:53 GMT-0600 (CST)]: The quick brown fox jumps over the lazy dog.

After transformation:
[11:13:53 GMT-0600 (CST)]: THE QUICK BROWN FOX JUMPS OVER THE LAZY DOG.

THE QUICK BROWN FOX JUMPS OVER THE LAZY DOG.

Note since JS is single threaded, both verbose streams wait the same amount of time before continuing

like image 166
smac89 Avatar answered Nov 15 '22 05:11

smac89