Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to pipe multiple readable streams, from multiple api requests, to a single writeable stream?

- Desired Behaviour
- Actual Behaviour
- What I've Tried
- Steps To Reproduce
- Research


Desired Behaviour

Pipe multiple readable streams, received from multiple api requests, to a single writeable stream.

The api responses are from ibm-watson's textToSpeech.synthesize() method.

The reason multiple requests are required is because the service has a 5KB limit on text input.

Therefore a string of 18KB, for example, requires four requests to complete.

Actual Behaviour

The writeable stream file is incomplete and garbled.

The application seems to 'hang'.

When I try and open the incomplete .mp3 file in an audio player, it says it is corrupted.

The process of opening and closing the file seems to increase its file size - like opening the file somehow prompts more data to flow in to it.

Undesirable behaviour is more apparent with larger inputs, eg four strings of 4000 bytes or less.

What I've Tried

I've tried several methods to pipe the readable streams to either a single writeable stream or multiple writeable streams using the npm packages combined-stream, combined-stream2, multistream and archiver and they all result in incomplete files. My last attempt doesn't use any packages and is shown in the Steps To Reproduce section below.

I am therefore questioning each part of my application logic:

01. What is the response type of a watson text to speech api request?

The text to speech docs, say the api response type is:

Response type: NodeJS.ReadableStream|FileObject|Buffer

I am confused that the response type is one of three possible things.

In all my attempts, I have been assuming it is a readable stream.

02. Can I make multiple api requests in a map function?

03. Can I wrap each request within a promise() and resolve the response?

04. Can I assign the resulting array to a promises variable?

05. Can I declare var audio_files = await Promise.all(promises)?

06. After this declaration, are all responses 'finished'?

07. How do I correctly pipe each response to a writable stream?

08. How do I detect when all pipes have finished, so I can send file back to client?

For questions 2 - 6, I am assuming the answer is 'YES'.

I think my failures relate to question 7 and 8.

Steps To Reproduce

You can test this code with an array of four randomly generated text strings with a respective byte size of 3975, 3863, 3974 and 3629 bytes - here is a pastebin of that array.

// route handler
app.route("/api/:api_version/tts")
    .get(api_tts_get);

// route handler middleware
const api_tts_get = async (req, res) => {

    var query_parameters = req.query;

    var file_name = query_parameters.file_name;
    var text_string_array = text_string_array; // eg: https://pastebin.com/raw/JkK8ehwV

    var absolute_path = path.join(__dirname, "/src/temp_audio/", file_name);
    var relative_path = path.join("./src/temp_audio/", file_name); // path relative to server root

    // for each string in an array, send it to the watson api  
    var promises = text_string_array.map(text_string => {

        return new Promise((resolve, reject) => {

            // credentials
            var textToSpeech = new TextToSpeechV1({
                iam_apikey: iam_apikey,
                url: tts_service_url
            });

            // params  
            var synthesizeParams = {
                text: text_string,
                accept: 'audio/mp3',
                voice: 'en-US_AllisonV3Voice'
            };

            // make request  
            textToSpeech.synthesize(synthesizeParams, (err, audio) => {
                if (err) {
                    console.log("synthesize - an error occurred: ");
                    return reject(err);
                }
                resolve(audio);
            });

        });
    });

    try {
        // wait for all responses
        var audio_files = await Promise.all(promises);
        var audio_files_length = audio_files.length;

        var write_stream = fs.createWriteStream(`${relative_path}.mp3`);

        audio_files.forEach((audio, index) => {

            // if this is the last value in the array, 
            // pipe it to write_stream, 
            // when finished, the readable stream will emit 'end' 
            // then the .end() method will be called on write_stream  
            // which will trigger the 'finished' event on the write_stream    
            if (index == audio_files_length - 1) {
                audio.pipe(write_stream);
            }
            // if not the last value in the array, 
            // pipe to write_stream and leave open 
            else {
                audio.pipe(write_stream, { end: false });
            }

        });

        write_stream.on('finish', function() {

            // download the file (using absolute_path)  
            res.download(`${absolute_path}.mp3`, (err) => {
                if (err) {
                    console.log(err);
                }
                // delete the file (using relative_path)  
                fs.unlink(`${relative_path}.mp3`, (err) => {
                    if (err) {
                        console.log(err);
                    }
                });
            });

        });


    } catch (err) {
        console.log("there was an error getting tts");
        console.log(err);
    }

}

The official example shows:

textToSpeech.synthesize(synthesizeParams)
  .then(audio => {
    audio.pipe(fs.createWriteStream('hello_world.mp3'));
  })
  .catch(err => {
    console.log('error:', err);
  });

which seems to work fine for single requests, but not for multiple requests, as far as I can tell.

Research

concerning readable and writeable streams, readable stream modes (flowing and paused), 'data', 'end', 'drain' and 'finish' events, pipe(), fs.createReadStream() and fs.createWriteStream()


Almost all Node.js applications, no matter how simple, use streams in some manner...

const server = http.createServer((req, res) => {
// `req` is an http.IncomingMessage, which is a Readable Stream
// `res` is an http.ServerResponse, which is a Writable Stream

let body = '';
// get the data as utf8 strings.
// if an encoding is not set, Buffer objects will be received.
req.setEncoding('utf8');

// readable streams emit 'data' events once a listener is added
req.on('data', (chunk) => {
body += chunk;
});

// the 'end' event indicates that the entire body has been received
req.on('end', () => {
try {
const data = JSON.parse(body);
// write back something interesting to the user:
res.write(typeof data);
res.end();
} catch (er) {
// uh oh! bad json!
res.statusCode = 400;
return res.end(`error: ${er.message}`);
}
});
});

https://nodejs.org/api/stream.html#stream_api_for_stream_consumers


Readable streams have two main modes that affect the way we can consume them...they can be either in the paused mode or in the flowing mode. All readable streams start in the paused mode by default but they can be easily switched to flowing and back to paused when needed...just adding a data event handler switches a paused stream into flowing mode and removing the data event handler switches the stream back to paused mode.

https://www.freecodecamp.org/news/node-js-streams-everything-you-need-to-know-c9141306be93


Here’s a list of the important events and functions that can be used with readable and writable streams

enter image description here

The most important events on a readable stream are:

The data event, which is emitted whenever the stream passes a chunk of data to the consumer The end event, which is emitted when there is no more data to be consumed from the stream.

The most important events on a writable stream are:

The drain event, which is a signal that the writable stream can receive more data. The finish event, which is emitted when all data has been flushed to the underlying system.

https://www.freecodecamp.org/news/node-js-streams-everything-you-need-to-know-c9141306be93


.pipe() takes care of listening for 'data' and 'end' events from the fs.createReadStream().

https://github.com/substack/stream-handbook#why-you-should-use-streams


.pipe() is just a function that takes a readable source stream src and hooks the output to a destination writable stream dst

https://github.com/substack/stream-handbook#pipe


The return value of the pipe() method is the destination stream

https://flaviocopes.com/nodejs-streams/#pipe


By default, stream.end() is called on the destination Writable stream when the source Readable stream emits 'end', so that the destination is no longer writable. To disable this default behavior, the end option can be passed as false, causing the destination stream to remain open:

https://nodejs.org/api/stream.html#stream_readable_pipe_destination_options


The 'finish' event is emitted after the stream.end() method has been called, and all data has been flushed to the underlying system.

const writer = getWritableStreamSomehow();
for (let i = 0; i < 100; i++) {
  writer.write(`hello, #${i}!\n`);
}
writer.end('This is the end\n');
writer.on('finish', () => {
  console.log('All writes are now complete.');
});

https://nodejs.org/api/stream.html#stream_event_finish


If you're trying to read multiple files and pipe them to a writable stream, you have to pipe each one to the writable stream and and pass end: false when doing it, because by default, a readable stream ends the writable stream when there's no more data to be read. Here's an example:

var ws = fs.createWriteStream('output.pdf');

fs.createReadStream('pdf-sample1.pdf').pipe(ws, { end: false });
fs.createReadStream('pdf-sample2.pdf').pipe(ws, { end: false });
fs.createReadStream('pdf-sample3.pdf').pipe(ws);

https://stackoverflow.com/a/30916248


You want to add the second read into an eventlistener for the first read to finish...

var a = fs.createReadStream('a');
var b = fs.createReadStream('b');
var c = fs.createWriteStream('c');
a.pipe(c, {end:false});
a.on('end', function() {
  b.pipe(c)
}

https://stackoverflow.com/a/28033554


A Brief History of Node Streams - part one and two.


Related Google search:

how to pipe multiple readable streams to a single writable stream? nodejs

Questions covering the same or similar topic, without authoritative answers (or might be 'outdated'):

How to pipe multiple ReadableStreams to a single WriteStream?

Piping to same Writable stream twice via different Readable stream

Pipe multiple files to one response

Creating a Node.js stream from two piped streams

like image 829
user1063287 Avatar asked Jul 23 '19 06:07

user1063287


People also ask

What is the correct way to Piper readable stream and writable stream?

To consume a readable stream, we can use the pipe / unpipe methods, or the read / unshift / resume methods. To consume a writable stream, we can make it the destination of pipe / unpipe , or just write to it with the write method and call the end method when we're done.

How do you read data from a readable stream?

The ReadableStream() constructor The constructor takes two objects as parameters. The first object is required, and creates a model in JavaScript of the underlying source the data is being read from. The second object is optional, and allows you to specify a custom queuing strategy to use for your stream.

How do I use Readstream?

createReadStream() allows you to open up a readable stream in a very simple manner. All you have to do is pass the path of the file to start streaming in. It turns out that the response (as well as the request) objects are streams. So we will use this fact to create a http server that streams the files to the client.

What are duplex streams?

- A duplex stream is a stream that implements both a readable and a writable. These streams allow data to pass through. Readable streams will pipe data into a duplex stream, and the duplex stream can also write that data. So duplex stream represent the middle sections of pipelines.


1 Answers

The core problem to solve here is asynchronicity. You almost had it: the problem with the code you posted is that you are piping all source streams in parallel & unordered into the target stream. This means data chunks will flow randomly from different audio streams - even your end event will outrace the pipes without end closing the target stream too early, which might explain why it increases after you re-open it.

What you want is to pipe them sequentially - you even posted the solution when you quoted

You want to add the second read into an eventlistener for the first read to finish...

or as code:

a.pipe(c, { end:false });
a.on('end', function() {
  b.pipe(c);
}

This will pipe the source streams in sequential order into the target stream.

Taking your code this would mean to replace the audio_files.forEach loop with:

await Bluebird.mapSeries(audio_files, async (audio, index) => {  
    const isLastIndex = index == audio_files_length - 1;
    audio.pipe(write_stream, { end: isLastIndex });
    return new Promise(resolve => audio.on('end', resolve));
});

Note the usage of bluebird.js mapSeries here.

Further advice regarding your code:

  • you should consider using lodash.js
  • you should use const & let instead of var and consider using camelCase
  • when you notice "it works with one event, but fails with multiple" always think: asynchronicity, permutations, race conditions.

Further reading, limitations of combining native node streams: https://github.com/nodejs/node/issues/93

like image 152
B M Avatar answered Oct 23 '22 12:10

B M