I have a csv parser implemented as a series of transform streams:
process.stdin
.pipe(iconv.decodeStream('win1252'))
.pipe(csv.parse())
.pipe(buildObject())
.pipe(process.stdout);
I'd like to abstract the parser (in its own module) and be able to do:
process.stdin.
.pipe(parser)
.pipe(process.stdout);
where parser
is just the composition of the previously used transform streams.
If I do
var parser = iconv.decodeStream('win1252')
.pipe(csv.parse())
.pipe(buildObject());
then parser
is set to the buildObject() stream and only this transformation stream receives the data.
If I do
var parser = iconv.decodeStream('win1252');
parser
.pipe(csv.parse())
.pipe(buildObject());
it doesn't work either, as .pipe(process.stdout)
will be called on the 1st transform stream and the 2 others will be bypassed.
Any recommendation for an elegant composition of streams?
A transform stream consists of a pair of streams: a writable stream, known as its writable side, and a readable stream, known as its readable side. Writes to the writable side result in new data being made available for reading from the readable side.
You can do it with fs module. The function fs. createReadStream() allows you to open up a readable stream and all you have to do is pass the path of the file to start streaming in. If you don't want to create file, you can create an in-memory stream and do something with it (for example, upload it somewhere).
The ReadableStream() constructorThe constructor takes two objects as parameters. The first object is required, and creates a model in JavaScript of the underlying source the data is being read from. The second object is optional, and allows you to specify a custom queuing strategy to use for your stream.
libuv is a multi-platform C library that provides support for asynchronous I/O based on event loops. It supports epoll(4) , kqueue(2) , Windows IOCP, and Solaris event ports. It is primarily designed for use in Node. js but it is also used by other software projects.
As of 2022, and nodejs v16, there is a new compose
function in the stream module, that build a Duplex stream from a list of streams.
see : https://nodejs.org/api/stream.html#streamcomposestreams
works with .pipe()
and async syntax.
I think this can be done natively now.
const { PassThrough, Transform } = require('stream');
const compose = (...streams) => {
const first = new PassThrough();
const last = new PassThrough();
const result = new Transform();
[first, ...streams, last].reduce(
(chain, stream) => (
stream.on('error', (error) => result.emit('error', error)),
chain.pipe(stream)
),
);
result._transform = (chunk, enc, cb) => {
last.once('data', (chunk) => cb(null, chunk));
first.push(chunk, enc);
};
result._flush = (cb) => {
last.once('end', () => cb(null));
first.push(null);
};
return result;
};
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With