Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to emit/pipe array values as a readable stream in node.js?

Tags:

What is the best way to create a readable stream from an array and pipe values to a writable stream? I have seen substack's example using setInterval and I can implement that successfully using 0 for the interval value, but I am iterating over a lot of data and triggering gc every time is slowing things down.

// Working with the setInterval wrapper
var arr = [1, 5, 3, 6, 8, 9];

function createStream () {
    var t = new stream;
    t.readable = true;
    var times = 0;
    var iv = setInterval(function () {
        t.emit('data', arr[times]);
        if (++times === arr.length) {
            t.emit('end');
            clearInterval(iv);
        }
    }
}, 0);

// Create the writable stream s
// ....

createStream().pipe(s);

What I would like to do is emit values without the setInterval. Perhaps using the async module like this:

async.forEachSeries(arr, function(item, cb) {
    t.emit('data', item);
    cb();
}, function(err) {
 if (err) {
     console.log(err);
 }
 t.emit('end');
});

In this case I iterate the array and emit data, but never pipe any values. I have already seen shinout's ArrayStream, but I think that was created before v0.10 and it is a bit more overhead than I am looking for.

like image 972
TankofVines Avatar asked May 31 '13 02:05

TankofVines


Video Answer


1 Answers

You can solve this problem by creating a readable stream and pushing values into it.

Streams are a pain, but it's often easier to work with them directly than to use libraries.

Array of strings or buffers to stream

If you're working with an array of strings or buffers, this will work:

'use strict'
const Stream = require('stream')
const readable = new Stream.Readable()

readable.pipe(process.stdout)

const items = ['a', 'b', 'c']
items.forEach(item => readable.push(item))

// no more data
readable.push(null)

Notes:

  • readable.pipe(process.stdout) does two things: puts the stream into "flowing" mode and sets up the process.stdout writable stream to receive data from readable
  • the Readable#push method is for the creator of the readable stream, not the stream consumer.
  • You have to do Readable#push(null) to signal that there is no more data.

Array of non-strings to stream

To make a stream from an array of things that are neither strings nor buffers, you need both the readable stream and the writable stream to be in "Object Mode". In the example below, I made the following changes:

  • Initialize the readable stream with {objectMode: true}
  • Instead of piping to process.stdout, pipe to a simple writable stream that is in object mode.

      'use strict'
      const Stream = require('stream')
    
      const readable = new Stream.Readable({objectMode: true})
    
      const writable = new Stream.Writable({objectMode: true})
      writable._write = (object, encoding, done) => {
        console.log(object)
    
        // ready to process the next chunk
        done()
      }
    
      readable.pipe(writable)
    
      const items = [1, 2, 3]
      items.forEach(item => readable.push(item))
    
      // end the stream
      readable.push(null)
    

Performance Note

Where is the data coming from? If it's a streaming data source, it's better to manipulate the stream using a transform stream than to convert to/from an array.

like image 63
Max Heiber Avatar answered Oct 05 '22 23:10

Max Heiber