I create a new Duplex stream like this
const Duplex = require('stream').Duplex;
let myStream = new Duplex()
Via a Websocket I receive chunks/buffer which I add to the stream like this every time a new chunk comes in via the Websocket:
myStream.push(buffer)
I then pipe the stream to another process (ffmpeg in this example)
myStream.pipe(process.stdout);
This causes the error NodeError: The _read() method is not implemented
which I understand but I don't understand why and how I should implement it. I also see that in the Duplex class constructor you can pass a read function, but why is this neccessary? I just wanna continuously push chunks into the stream and then pipe it to another process
The nodejs Duplex stream requires the implementer to specify both a write and a read method:
import stream from 'stream';
const duplex = new stream.Duplex({
write: (chunk, encoding, next) {
// Do something with the chunk and then call next() to indicate
// that the chunk has been processed. The write() fn will handle
// data piped into this duplex stream. After the write() has
// finished, the data will be processed by the read() below.
next();
},
read: ( size ) {
// Add new data to be read by streams piped from this duplex
this.push( "some data" )
}
})
The official nodejs documentation on streams is available here: API for Stream Implementers
The websocket scenario
The websocket example described above should probably use a Readable rather than a duplex stream. Duplex streams are useful in store-and-forward or process-and-forward scenarios. However, it sounds like the stream in the websocket example is used solely to move data from the websocket to a stream interface. This can be achieved using a Readable:
import stream from 'stream';
const onSocketConnection = ( socket ) => {
const readable = new stream.Readable({
// The read logic is omitted since the data is pushed to the socket
// outside of the script's control. However, the read() function
// must be defined.
read(){}
});
socket.on('message', ( data ) => {
// Push the data on the readable queue
readable.push( data );
});
readable.pipe( ffmpeg );
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With