Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to compose transform Streams in node.js

Tags:

stream

node.js

I have a csv parser implemented as a series of transform streams:

process.stdin
    .pipe(iconv.decodeStream('win1252'))
    .pipe(csv.parse())
    .pipe(buildObject())
    .pipe(process.stdout);

I'd like to abstract the parser (in its own module) and be able to do:

process.stdin.
    .pipe(parser)
    .pipe(process.stdout);

where parser is just the composition of the previously used transform streams.

If I do

var parser = iconv.decodeStream('win1252')
    .pipe(csv.parse())
    .pipe(buildObject());

then parser is set to the buildObject() stream and only this transformation stream receives the data.

If I do

var parser = iconv.decodeStream('win1252');
parser
    .pipe(csv.parse())
    .pipe(buildObject());

it doesn't work either, as .pipe(process.stdout) will be called on the 1st transform stream and the 2 others will be bypassed.

Any recommendation for an elegant composition of streams?

like image 359
Laurent VB Avatar asked May 16 '14 14:05

Laurent VB


People also ask

What is a transform stream?

A transform stream consists of a pair of streams: a writable stream, known as its writable side, and a readable stream, known as its readable side. Writes to the writable side result in new data being made available for reading from the readable side.

How do I create a Node stream?

You can do it with fs module. The function fs. createReadStream() allows you to open up a readable stream and all you have to do is pass the path of the file to start streaming in. If you don't want to create file, you can create an in-memory stream and do something with it (for example, upload it somewhere).

How do you use ReadableStream?

The ReadableStream() constructorThe constructor takes two objects as parameters. The first object is required, and creates a model in JavaScript of the underlying source the data is being read from. The second object is optional, and allows you to specify a custom queuing strategy to use for your stream.

What is Libuv in node JS?

libuv is a multi-platform C library that provides support for asynchronous I/O based on event loops. It supports epoll(4) , kqueue(2) , Windows IOCP, and Solaris event ports. It is primarily designed for use in Node. js but it is also used by other software projects.


2 Answers

As of 2022, and nodejs v16, there is a new compose function in the stream module, that build a Duplex stream from a list of streams.

see : https://nodejs.org/api/stream.html#streamcomposestreams

works with .pipe() and async syntax.

like image 126
Elmatou Avatar answered Sep 28 '22 03:09

Elmatou


I think this can be done natively now.

const { PassThrough, Transform } = require('stream');

const compose = (...streams) => {
  const first = new PassThrough();
  const last = new PassThrough();
  const result = new Transform();

  [first, ...streams, last].reduce(
    (chain, stream) => (
      stream.on('error', (error) => result.emit('error', error)),
      chain.pipe(stream)
    ),
  );

  result._transform = (chunk, enc, cb) => {
    last.once('data', (chunk) => cb(null, chunk));
    first.push(chunk, enc);
  };

  result._flush = (cb) => {
    last.once('end', () => cb(null));
    first.push(null);
  };

  return result;
};
like image 28
Aravindan Ve Avatar answered Sep 28 '22 02:09

Aravindan Ve