When writing NodeJS Transform stream in transform function how can I know if the chunk is the last one or there is no any new chunk coming.
_transform(chunk: any, encoding: string, callback: Function): void {
// accumulating chunks here to buffer
// so that I need to do some processing on the whole buffer
// and I need to understand when to do that
}
So I need to know when the chunks coming to the Stream are over, to do some processing on the buffer composed of all the chunks and then push the processed data from the stream.
In the _transform
you can not figure out if there will be more data or not.
Depending on your use case you would either listen to the end
event, or use _flush
:
Stream: transform._flush(callback):
Custom Transform implementations may implement the
transform._flush()
method. This will be called when there is no more written data to be consumed, but before the'end'
event is emitted signaling the end of the Readable stream.Within the
transform._flush()
implementation, thereadable.push()
method may be called zero or more times, as appropriate. The callback function must be called when the flush operation is complete.
Example,
const COUNTER_NULL_SYMBOL = Symbol('COUNTER_NULL_SYMBOL');
const Counter = () => {
let data = COUNTER_NULL_SYMBOL;
let counter = 1;
let first = true;
const counterStream = new Transform({
objectMode: true,
decodeStrings: false,
highWaterMark: 1,
transform(chunk, encoding, callback) {
if (data === COUNTER_NULL_SYMBOL) {
data = chunk;
return callback();
} else {
this.push({data, counter, last: false, first});
first = false;
counter++;
data = chunk;
return callback();
}
},
});
counterStream._flush = function (callback) {
if (data === COUNTER_NULL_SYMBOL) {
return callback();
} else {
this.push({data, counter, last: true, first});
return callback();
}
};
return counterStream;
};
Using through2
fs.createReadStream('/tmp/important.dat')
.pipe(through2(
(chunk, enc, cb) => cb(null, chunk), // transform is a noop
function (cb) { // FLUSH FUNCTION
this.push('tacking on an extra buffer to the end');
cb();
}
))
.pipe(fs.createWriteStream('/tmp/wut.txt'));
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With