Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

NodeJS streams and premature end

Assuming a Readable Stream in NodeJS and a Data (on('data', ...)) event handler tied to it that is relatively slow, is it possible for the End event to fire before the last Data handler(s) has finished, and if so, will it prematurely terminate that handler? Or, will all Data events get dispatched and run?

In my case, I am working with large files and want to commit to a DB every data chunk. I am worried that I may lose the last record or two (or more) if End is fired before the last DB calls in the handler actually complete.

like image 648
alphadogg Avatar asked Oct 06 '15 19:10

alphadogg


People also ask

Are Node streams asynchronous?

Streams are a built-in Node. js language feature that represent an asynchronous flow of data, and are a way to handle reading/writing files.

Why do we need streams in Nodejs?

Chaining the StreamsChaining is a mechanism to connect the output of one stream to another stream and create a chain of multiple stream operations. It is normally used with piping operations. Now we'll use piping and chaining to first compress a file and then decompress the same.

How many streams are in Nodejs?

There are four fundamental stream types within Node.js: Writable : streams to which data can be written (for example, fs.createWriteStream() ).

What is Libuv in node JS?

libuv is a multi-platform C library that provides support for asynchronous I/O based on event loops. It supports epoll(4) , kqueue(2) , Windows IOCP, and Solaris event ports. It is primarily designed for use in Node. js but it is also used by other software projects.


1 Answers

Event 'end' fire after last 'data' event. But it may happend before the last Data handler has finished. It is possible that before one 'data' handler has finished, next is started. It depends of what you have in your code, but it is possible that later call of event 'data' finish before earlier. It may cause errors and problems in your code.

Example how to cause problems (to your own tests):

  var fs = require('fs');
  var rr = fs.createReadStream('somebigfile.jpg');
  var i=0;
  rr.on('data', function(chunk) {
    i++;
    var s = i;
    console.log('readable:' + s);
    setTimeout(function(){
      console.log('timeout:'+s);
    }, 50-i*10);
  });
  rr.on('end', function() {
    console.log('end');
  });

It will print in your console when start each 'data' event handler. And after some miliseconds when it finish. Finish may be in different order.

Solution:

Readable Streams have two modes 'flowing mode' and a 'paused mode'. When you add 'data' event handler, you auto set Readable Streams to flowing mode.

From documentation :

When in flowing mode, data is read from the underlying system and provided to your program as fast as possible

In this mode events will not wait for your slow actions to finish. For your need is 'paused mode'.

From documentation:

In paused mode, you must explicitly call stream.read() to get chunks of data out. Streams start out in paused mode.

In other words: you demand chunk of data, you get it, you work with it, and when you ready you ask for new chunk of data. In this mode you controll when you want to get your data.

How to change to 'paused mode':

It is default mode for this stream. But when you register 'data' event handler it switch to 'flowing mode'. Therefore not use readstream.on('data',...) Instead use readstream.on('readable', function(){...}) when it fire, then it means that stream is ready to give chunk of data. To get chunk of data use var chunk = readstream.read();

Example from docs:

var fs = require('fs');
var rr = fs.createReadStream('foo.txt');
rr.on('readable', function() {
  console.log('readable:', rr.read());
});
rr.on('end', function() {
  console.log('end');
});

Please read documentation for more details, because there are more posibilities when stream is auto switched to 'flowing mode'.

Work with slow handlers and flowing mode:

If you want/need work in 'flowing mode', there is also solution. You can pause and resume stream. When you get chunk form readstream('data'), pause stream and when you finish work then resume it.

Example from documentation:

var readable = getReadableStreamSomehow();
readable.on('data', function(chunk) {
  console.log('got %d bytes of data', chunk.length);
  readable.pause();
  console.log('there will be no more data for 1 second');
  setTimeout(function() {
    console.log('now data will start flowing again');
    readable.resume();
  }, 1000);
});
like image 188
Krzysztof Sztompka Avatar answered Nov 15 '22 10:11

Krzysztof Sztompka