transform
Transform streams are both readable and writeable, and thus are really good 'middle' streams. For this reason, they are sometimes referred to as through
streams. They are similar to a duplex stream in this way, except they provide a nice interface to manipulate the data rather than just sending it through. The purpose of a transform stream is to manipulate the data as it is piped through the stream. You may want to do some async calls, for example, or derive a couple of fields, remap some things, etc.
For how to create a transform stream see here and here. All you have to do is :
_transform
method which takes a (chunk, encoding, callback)
. The chunk is your data. Most of the time you won't need to worry about encoding if you are working in objectMode = true
. The callback is called when you are done processing the chunk. This chunk is then pushed on to the next stream.
If you want a nice helper module that will enable you to do through stream really really easily, I suggest through2.
For error handling, keep reading.
pipe
In a pipe chain, handling errors is indeed non-trivial. According to this thread .pipe() is not built to forward errors. So something like ...
var a = createStream();
a.pipe(b).pipe(c).on('error', function(e){handleError(e)});
... would only listen for errors on the stream c
. If an error event was emitted on a
, that would not be passed down and, in fact, would throw. To do this correctly:
var a = createStream();
a.on('error', function(e){handleError(e)})
.pipe(b)
.on('error', function(e){handleError(e)})
.pipe(c)
.on('error', function(e){handleError(e)});
Now, though the second way is more verbose, you can at least keep the context of where your errors happen. This is usually a good thing.
One library I find helpful though if you have a case where you only want to capture the errors at the destination and you don't care so much about where it happened is event-stream.
end
When an error event is fired, the end event will not be fired (explicitly). The emitting of an error event will end the stream.
domains
In my experience, domains work really well most of the time. If you have an unhandled error event (i.e. emitting an error on a stream without a listener), the server can crash. Now, as the above article points out, you can wrap the stream in a domain which should properly catch all errors.
var d = domain.create();
d.on('error', handleAllErrors);
d.run(function() {
fs.createReadStream(tarball)
.pipe(gzip.Gunzip())
.pipe(tar.Extract({ path: targetPath }))
.on('close', cb);
});
The beauty of domains is that they will preserve the stack traces. Though event-stream does a good job of this as well.
For further reading, check out the stream-handbook. Pretty in depth, but super useful and gives some great links to lots of helpful modules.
If you are using node >= v10.0.0 you can use stream.pipeline and stream.finished.
For example:
const { pipeline, finished } = require('stream');
pipeline(
input,
transformA,
transformB,
transformC,
(err) => {
if (err) {
console.error('Pipeline failed', err);
} else {
console.log('Pipeline succeeded');
}
});
finished(input, (err) => {
if (err) {
console.error('Stream failed', err);
} else {
console.log('Stream is done reading');
}
});
See this github PR for more discussion.
domains are deprecated. you dont need them.
for this question, distinctions between transform or writable are not so important.
mshell_lauren's answer is great, but as an alternative you can also explicitly listen for the error event on each stream you think might error. and reuse the handler function if you prefer.
var a = createReadableStream()
var b = anotherTypeOfStream()
var c = createWriteStream()
a.on('error', handler)
b.on('error', handler)
c.on('error', handler)
a.pipe(b).pipe(c)
function handler (err) { console.log(err) }
doing so prevents the infamous uncaught exception should one of those stream fire its error event
Errors from the whole chain can be propagated to the rightmost stream using a simple function:
function safePipe (readable, transforms) {
while (transforms.length > 0) {
var new_readable = transforms.shift();
readable.on("error", function(e) { new_readable.emit("error", e); });
readable.pipe(new_readable);
readable = new_readable;
}
return readable;
}
which can be used like:
safePipe(readable, [ transform1, transform2, ... ]);
.on("error", handler)
only takes care of Stream errors but if you are using custom Transform streams, .on("error", handler)
don't catch the errors happening inside _transform
function. So one can do something like this for controlling application flow :-
this
keyword in _transform
function refers to Stream
itself, which is an EventEmitter
. So you can use try catch
like below to catch the errors and later on pass them to the custom event handlers.
// CustomTransform.js
CustomTransformStream.prototype._transform = function (data, enc, done) {
var stream = this
try {
// Do your transform code
} catch (e) {
// Now based on the error type, with an if or switch statement
stream.emit("CTError1", e)
stream.emit("CTError2", e)
}
done()
}
// StreamImplementation.js
someReadStream
.pipe(CustomTransformStream)
.on("CTError1", function (e) { console.log(e) })
.on("CTError2", function (e) { /*Lets do something else*/ })
.pipe(someWriteStream)
This way, you can keep your logic and error handlers separate. Also , you can opt to handle only some errors and ignore others.
UPDATE
Alternative: RXJS Observable
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With