Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to create a Duplex stream in nodejs?

I create a new Duplex stream like this

const Duplex = require('stream').Duplex;
let myStream = new Duplex()

Via a Websocket I receive chunks/buffer which I add to the stream like this every time a new chunk comes in via the Websocket:

myStream.push(buffer)

I then pipe the stream to another process (ffmpeg in this example)

myStream.pipe(process.stdout); This causes the error NodeError: The _read() method is not implemented which I understand but I don't understand why and how I should implement it. I also see that in the Duplex class constructor you can pass a read function, but why is this neccessary? I just wanna continuously push chunks into the stream and then pipe it to another process

like image 878
curiousMind Avatar asked Nov 07 '19 11:11

curiousMind


1 Answers

The nodejs Duplex stream requires the implementer to specify both a write and a read method:

import stream from 'stream';

const duplex = new stream.Duplex({
  write: (chunk, encoding, next) {
    // Do something with the chunk and then call next() to indicate 
    // that the chunk has been processed. The write() fn will handle
    // data piped into this duplex stream. After the write() has
    // finished, the data will be processed by the read() below.
    next();
  },
  read: ( size ) {
    // Add new data to be read by streams piped from this duplex
    this.push( "some data" )
  }
})

The official nodejs documentation on streams is available here: API for Stream Implementers

The websocket scenario
The websocket example described above should probably use a Readable rather than a duplex stream. Duplex streams are useful in store-and-forward or process-and-forward scenarios. However, it sounds like the stream in the websocket example is used solely to move data from the websocket to a stream interface. This can be achieved using a Readable:


import stream from 'stream';

const onSocketConnection = ( socket ) => {
    const readable = new stream.Readable({
      // The read logic is omitted since the data is pushed to the socket
      // outside of the script's control. However, the read() function 
      // must be defined.
      read(){}
    });

    socket.on('message', ( data ) => {
        // Push the data on the readable queue
        readable.push( data );
    });

    readable.pipe( ffmpeg );
}
like image 113
jorgenkg Avatar answered Oct 24 '22 15:10

jorgenkg