Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to read lines of a file with node.js or javascript with delay, not in non-blocking behavior?

I am reading a file (300,000 lines) in node.js. I want to send lines in batches of 5,000 lines to another application (Elasticsearch) to store them. So whenever I finish reading 5,000 lines, I want to send them in bulk to Elasticsearch through an API to store them and then keep reading the rest of the file and send every 5,000 line in bulk.

If I want to use java (or any other blocking language such as C, C++, python, etc.) for this task, I'll do something like this:

int countLines = 0;
String bulkString = "";
BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("filePath.txt")));
while ((currentLine = br.readLine()) != null) {
     countLines++;
     bulkString += currentLine;
     if(countLines >= 5000){
          //send bulkString to Elasticsearch via APIs
          countLines = 0;
          bulkString = "";
     }
}

If I want to do the same thing with node.js, I will do:

var countLines = 0;
var bulkString = "";
var instream = fs.createReadStream('filePath.txt');
var rl = readline.createInterface(instream, outstream);
rl.on('line', function(line) {
     if(countLines >= 5000){
          //send bulkString to via APIs
          client.bulk({
          index: 'indexName',
          type: 'type',
          body: [bulkString]
          }, function (error, response) {
          //task is done
          });
          countLines = 0;
          bulkString = "";
     }
}

The problem with node.js is that it is non-blocking so it doesn't wait for the first API response before sending the next batch of lines. I know that this could count as a benefit for done.js because it does not wait for I/O, but the problem is that it sends too much of data to the Elasticsearch. Therefor the Elasticsearch's queue will get full and it will throw an exceptions.

My question is that how can I make the node.js to wait for the response from the API before it continues to read next lines or before it sends the next batch of lines to the Elasticsearch.

I know I can set some parameters in Elasticsearch to increase the queue size, but I am interested in blocking behavior of node.js for this issue. I am familiar with the concept of callbacks, but I cannot think of a way to use callbacks in this scenario to prevent node.js from calling the Elasticsearch API in non-blocking mode.

like image 879
Soheil Avatar asked Jun 02 '15 19:06

Soheil


People also ask

How do I read a file line by line in NodeJS?

Method 1: Using the Readline Module: Readline is a native module of Node. js, it was developed specifically for reading the content line by line from any readable stream. It can be used to read data from the command line. const readline = require('readline');

Is NodeJS blocking or non-blocking?

Node. js is based on an event-driven non-blocking I/O model.

How does NodeJS handle IO operations in a way that they don't block code execution?

Instead of the process being blocked and waiting for I/O operations to complete, the I/O operations are delegated to the system, so that the process can execute the next piece of code. Non-blocking I/O operations provide a callback function that is called when the operation is completed.

How does NodeJS overcome the problem of blocking of I O operations?

Note: So, the solution to this problem in node. js is to use asynchronous non-blocking code and Node. js uses an event loop for this. “An object that handles and processes external events and converts them into call-back calls” is what an event loop is.


2 Answers

Pierre's answer is correct. I just want to submit a code that shows how we can benefit from non-blocking concept of the node.js but at the same time, do not overwhelm the Elasticsearch with too many requests at one time.

Here is a pseudo code that you can use to give the code a flexibility by setting the queue size limit:

var countLines = 0;
var bulkString = "";
var queueSize = 3;//maximum of 3 requests will be sent to the Elasticsearch server
var batchesAlreadyInQueue = 0;
var instream = fs.createReadStream('filePath.txt');
var rl = readline.createInterface(instream, outstream);
rl.on('line', function(line) {
     if(countLines >= 5000){
          //send bulkString to via APIs
          client.bulk({
          index: 'indexName',
          type: 'type',
          body: [bulkString]
          }, function (error, response) {
               //task is done
               batchesAlreadyInQueue--;//we will decrease a number of requests that are already sent to the Elasticsearch when we hear back from one of the requests
               rl.resume();
          });
          if(batchesAlreadyInQueue >= queueSize){
               rl.pause();
          }
          countLines = 0;
          bulkString = "";
     }
}
like image 129
Soheil Avatar answered Oct 19 '22 17:10

Soheil


use rl.pause() right after your if and rl.resume() after your //task is done.

Note that you may have a few more line event after calling pause.

like image 44
Pierre Inglebert Avatar answered Oct 19 '22 15:10

Pierre Inglebert